Pyspark Business Report 2

A practice example of a pyspark business report using the JOIN method

Author

Dennis Feyerabend

Published

December 3, 2025

Abstract
The project demonstrates a full customer-segmentation workflow in PySpark, combining advanced statistical aggregation with relational joins to assemble a consolidated master dataset for downstream business reporting.

1 Generate customer tables

1.1 Tabelle 1: Customer revenue

Code
# ============================================================
# TABELLE 1: Customer Value
# ============================================================

print("\n" + "=" * 60)
print("TABELLE 1: Customer Value")
print("=" * 60)

kunden_umsatz = df.groupBy("customer_id").agg(
    count("*").alias("n_orders"),
    spark_round(sum("total"), 2).alias("total_revenue"),
    spark_round(avg("total"), 2).alias("average_order")
)

print(f"\nCustomer numbers: {kunden_umsatz.count():,}")
print("\nTop 10 revenue-high customers:")
kunden_umsatz.orderBy(desc("total_revenue")).show(10)

1.2 Tabelle 2: Customer-Activity

Code
# ============================================================
# TABELLE 2: Kunden-Aktivitaet (Letzte Bestellung)
# ============================================================

print("\n" + "=" * 60)
print("Table 2: Customer Activity")
print("=" * 60)

kunden_aktivitaet = df.groupBy("customer_id").agg(
    spark_max("date").alias("last_order"),
    spark_min("date").alias("first_order")
)

print("\nExample activity:")
kunden_aktivitaet.show(10)

1.3 Table 3: Customer-Demography

Code
# ============================================================
# TABELLE 3: Kunden-Demografie (Land, Alter)
# ============================================================

print("\n" + "=" * 60)
print("Table 3: Costumer Demography")
print("=" * 60)

# Wir nehmen die haeufigsten Werte pro Kunde
# Modus (haeufigster Wert) simulieren mit first()
kunden_demografie = df.groupBy("customer_id").agg(
    spark_round(avg("customer_age"), 0).alias("alter"),
    count(when(col("country") == "Germany", 1)).alias("bestellungen_de"),
    count(when(col("country") == "Austria", 1)).alias("bestellungen_at"),
    count(when(col("country") == "Switzerland", 1)).alias("bestellungen_ch")
)

# DACH-Kunde ja/nein
kunden_demografie = kunden_demografie.withColumn(
    "is_dach_customer",
    when(
        (col("bestellungen_de") > 0) |
        (col("bestellungen_at") > 0) |
        (col("bestellungen_ch") > 0),
        "Ja"
    ).otherwise("Nein")
)

print("\nBeispiel Demografie:")
kunden_demografie.show(10)

2 Join functions

Code
# ============================================================
# MASTER-TABELLE: Alle Daten zusammen
# ============================================================

print("\n" + "=" * 60)
print("MASTER JOIN: Alle Tabellen verbinden")
print("=" * 60)

anz_kunden = df.select("customer_id").distinct().count()
print(f'Bevor JOIN: {anz_kunden:,} Kunden')

# Erster JOIN: Umsatz + Aktivitaet
master_step1 = kunden_umsatz.join(
    kunden_aktivitaet, on="customer_id", how="inner"
)

print(f"Nach JOIN 1: {master_step1.count():,} Kunden")

# Zweiter JOIN: + Demografie
kunden_master = master_step1.join(
    kunden_demografie,
    "customer_id",
    "inner"
)

print(f"Nach JOIN 2: {kunden_master.count():,} Kunden")

# Letztes Datum fuer Inaktivitaet berechnen
max_date = df.agg(spark_max("date")).collect()[0][0]
# Berechne inaktivität vom max_date (heute)
kunden_master = kunden_master.withColumn("days_inactive", datediff(lit(max_date), col("last_order")))

print("\nmaster table example:")
kunden_master.show(5)

print("\nColumns in master-table:")
for spalte in kunden_master.columns:
    print(f"  - {spalte}")

2.1 Kunden-Segmente erstellen

Code
# ============================================================
# KUNDEN-SEGMENTIERUNG
# ============================================================

print("\n" + "=" * 60)
print("KUNDEN-SEGMENTIERUNG")
print("=" * 60)


# Segmente hinzufuegen
kunden_segmentiert = kunden_master \
    .withColumn("inactive", datediff(lit(max_date), col("last_order"))
    ) \
    .withColumn("revenue_segment",
        when(col("total_revenue") >= 5000, "VIP")
        .when(col("total_revenue") >= 1000, "Premium")
        .when(col("total_revenue") >= 200, "Standard")
        .otherwise("Gering")
    ) \
    .withColumn("activity_segment",
        when(col("days_inactive") <= 30, "active")
        .when(col("days_inactive") <= 90, "inactive")
        .otherwise("Verloren")
    ) \
    .withColumn("age_segment",
        when(col("alter") <= 25, "young")
        .when(col("alter") <= 40, "middle")
        .when(col("alter") <= 55, "experienced")
        .otherwise("senior")
    )

print("\nSegmentierte Kunden:")
kunden_segmentiert.select(
    "customer_id",
    "total_revenue",
    "revenue_segment",
    "days_inactive",
    "activity_segment",
    "age_segment",
    "is_dach_customer"
).show(15)

3 Business Reports

3.1 REPORT 1: Revenue segments

Code
report_umsatz = kunden_segmentiert.groupBy("revenue_segment").agg(
    count("*").alias("n_customers"),
    spark_round(sum("total_revenue")/1000000, 2).alias("segment_revenue_mill"),
    spark_round(avg("total_revenue"), 2).alias("avg_revenue")
).orderBy(desc("segment_revenue_mill"))

# Generate Umsatz prozent column
window_all = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

# Percent of customers
report_umsatz = report_umsatz.withColumn(
    "n_customers_percent",
    spark_round(col("n_customers") / sum("n_customers").over(window_all)*100, 2))

report_umsatz = report_umsatz.withColumn(
    "segment_revenue_percent",
    spark_round(col("segment_revenue_mill") / sum("segment_revenue_mill").over(window_all)*100, 2))

# Reorder Spark columns BEFORE toPandas()
report_umsatz_reordered = report_umsatz.select(
    "revenue_segment",
    "n_customers",
    "n_customers_percent",
    "segment_revenue_mill",
    "avg_revenue",
    "segment_revenue_percent"
)

# Convert to table
table_df = (
    report_umsatz_reordered.toPandas()
        .style.hide(axis="index")
        .format({
            "segment_revenue_mill": "{:,.2f}",
            "avg_revenue": "{:,.2f}",
            "segment_revenue_percent": "{:,.2f}",
            "n_customers_percent": "{:,.2f}",
        })
        .set_properties(
            subset=["n_customers", "n_customers_percent", "segment_revenue_mill", "avg_revenue", "segment_revenue_percent"],
            **{"text-align": "right"}
        )
        .set_table_styles([
            {"selector": "td",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]},
            {"selector": "th",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]}
        ])
)

table_df
revenue_segment n_customers n_customers_percent segment_revenue_mill avg_revenue segment_revenue_percent
VIP 272201 55.55 2,141.35 7,866.80 74.89
Premium 211943 43.26 713.82 3,367.99 24.97
Standard 5633 1.15 4.06 720.32 0.14
Gering 198 0.04 0.02 121.64 0.00

3.2 Report 1: Insights

  • VIP customers account for roughly 55% of the total customer base and generate 74.89% of total revenue, making them the strongest revenue driver.
  • The remaining customers consist largely of Premium customers (43.26% of the base), contributing an additional 24.97% of revenue.
  • Standard and Low-Value customers together contribute only 0.14% of revenue and therefore play a minor economic role

3.3 REPORT 2: Customer activity segments

Code
report_aktivitaet = kunden_segmentiert.groupBy("activity_segment").agg(
    count("*").alias("n_customers"),
    spark_round(sum("total_revenue")/1000000, 2).alias("segment_revenue_mill")
).orderBy(desc("segment_revenue_mill"))

# Percent of customers
report_aktivitaet = report_aktivitaet.withColumn(
    "segment_activity_percent",
    spark_round(col("segment_revenue_mill") / sum("segment_revenue_mill").over(window_all)*100, 2))

# Convert to table
table_df = (
    report_aktivitaet.toPandas()
        .style.hide(axis="index")
        .format({
            "segment_revenue_mill": "{:,.2f}",
            "segment_activity_percent": "{:,.2f}",
        })
        .set_properties(
            subset=["n_customers", "segment_revenue_mill", "segment_activity_percent"],
            **{"text-align": "right"}
        )
        .set_table_styles([
            {"selector": "td",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]},
            {"selector": "th",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]}
        ])
)

table_df
activity_segment n_customers segment_revenue_mill segment_activity_percent
active 172492 1,084.97 37.95
inactive 179916 1,070.26 37.43
Verloren 137567 704.02 24.62
Code
# 2) Collect small result to Pandas for plotting
pdf = report_aktivitaet.select(
    "activity_segment", "segment_activity_percent"
).toPandas()


fig, ax = plt.subplots(figsize=(10, 6))

# Barplot with automatically assigned colors
ax.bar(
    pdf["activity_segment"],
    pdf["segment_activity_percent"]
)

# Labels & formatting
ax.set_title("Revenue share by activity", fontsize=16)
ax.set_xlabel("Activity Status", fontsize=14)
ax.set_ylabel("Revenue share (%)", fontsize=14)
ax.tick_params(axis="both", labelsize=14)

ax.set_ylim(0, pdf["segment_activity_percent"].max() * 1.15)

# Rotate x-labels for readability
plt.xticks(rotation=0, fontsize=14)
ax.grid(False)

# Add % labels above bars
for i, v in enumerate(pdf["segment_activity_percent"]):
    ax.text(i, v + 1, f"{v:.1f}%", ha="center", fontsize=14)

plt.tight_layout()
plt.show()

3.4 Report 2: Insights

  • The revenue contribution of active and inactive customers is nearly identical, despite differences in engagement behavior.
  • This indicates that inactive customers remain a high-value segment, making them a strategic priority for retention efforts.
  • Lost customers are a critical isse as they contributed to 24% of total revenue, making them valuable for retention strategies.

3.5 REPORT 3: Lost VIPs (Critical!)

Code
verlorene_vips = kunden_segmentiert.filter(
    (col("revenue_segment") == "VIP") &
    (col("activity_segment") == "Verloren")
)

print(f"\nNumber lost VIPs: {verlorene_vips.count():,} ({(verlorene_vips.count() / kunden_segmentiert.filter(col('revenue_segment') == 'VIP').count() * 100):,.2f}% of total VIP customers)")

stats = verlorene_vips.agg(
    spark_round(sum("total_revenue"), 2).alias("verlorener_umsatz"),
    spark_round(avg("total_revenue"), 2).alias("avg_revenue")
).collect()[0]


print(f"Revenue of lost VIPs: {stats['verlorener_umsatz']:,.2f} EUR ({stats['verlorener_umsatz'] / kunden_umsatz.agg(sum('total_revenue')).collect()[0][0] * 100:.2f}% of total revenue)")
print(f"Average per customer: {stats['avg_revenue']:,.2f} EUR")

top10 = (verlorene_vips.select(
    "customer_id",
    "total_revenue",
    "n_orders",
    "last_order",
    "days_inactive"
).orderBy(desc("total_revenue")).limit(10))

# Convert to table
table_df = (
    top10.toPandas()
        .style.hide(axis="index")
        .format({
            "total_revenue": "{:,.2f}",
        })
        .set_properties(
            subset=["customer_id", "total_revenue", "n_orders", "last_order", "days_inactive"],
            **{"text-align": "right"}
        )
        .set_table_styles([
            {"selector": "td",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]},
            {"selector": "th",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]}
        ])
)

print("\nTop 10 lost VIPs:")
table_df

Number lost VIPs: 62,312 (22.89% of total VIP customers)
Revenue of lost VIPs: 470,218,054.34 EUR (16.45% of total revenue)
Average per customer: 7,546.19 EUR

Top 10 lost VIPs:
customer_id total_revenue n_orders last_order days_inactive
85733 25,860.02 17 2024-09-28 93
455882 25,711.89 17 2024-08-11 141
53226 25,558.90 20 2024-09-04 117
26705 24,174.79 18 2024-09-22 99
75067 24,134.88 15 2024-09-30 91
362128 23,652.76 21 2024-07-22 161
315539 23,246.34 17 2024-09-10 111
231746 23,061.97 19 2024-09-21 100
134045 22,840.01 16 2024-09-01 120
339963 22,683.69 15 2024-08-30 122

3.6 Report 3: Insights

  • Lost VIP customers account for a significant revenue share (16.45% of total turnover)
  • The most valuable VIP customers all conducted their last orders around September 2024, suggesting a potential triggering event or change in experience during that period.

3.7 REPORT 4: DACH vs International

Code
report_dach = kunden_segmentiert.groupBy("is_dach_customer").agg(
    count("*").alias("n_customers"),
    spark_round(sum("total_revenue")/1000000, 2).alias("total_revenue_mill"),
    spark_round(avg("total_revenue"), 2).alias("avg_revenue")
).orderBy(desc("total_revenue_mill"))

report_dach = report_dach.withColumn(
    "total_revenue_percent",
    spark_round(col("total_revenue_mill") / sum("total_revenue_mill").over(window_all)*100, 2)
)

# Convert to table
table_df = (
    report_dach.toPandas()
        .style.hide(axis="index")
        .format({
            "total_revenue_mill": "{:,.2f}", "avg_revenue": "{:,.2f}", "total_revenue_percent": "{:,.2f}"
        })
        .set_properties(
            subset=["is_dach_customer", "n_customers", "total_revenue_mill", "avg_revenue", "total_revenue_percent"],
            **{"text-align": "right"}
        )
        .set_table_styles([
            {"selector": "td",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]},
            {"selector": "th",
             "props": [("padding-right", "8px"), ("padding-left", "8px")]}
        ])
)

table_df
is_dach_customer n_customers total_revenue_mill avg_revenue total_revenue_percent
Ja 489697 2,858.74 5,837.78 99.98
Nein 278 0.51 1,841.44 0.02

3.8 Report 4: Insights

  • The DACH region (Germany, Switzerland, Austria) accounts for 99.98% of total revenue, making it by far the dominant market.
  • As a result, non-DACH regions contribute only marginally and are not a strategic priority for retention or growth.
  • However, this heavy concentration creates a risk: the business is highly dependent on a single geographic region. Any economic downturn, regulatory change, or market shift in DACH would have a disproportionate impact on overall revenue performance.