← Back to Blog

Mastering ETL with Databricks: Window Functions in PySpark and SQL

Discover how to leverage window functions in Databricks for sophisticated ETL transformations. Learn practical examples using both PySpark and T-SQL, including ranking, running totals, moving averages, and lag/lead operations.

Mastering ETL with Databricks: Window Functions in PySpark and SQL

Databricks has emerged as a leading unified analytics platform, combining the power of Apache Spark with collaborative notebooks and enterprise-grade reliability. One of the most powerful features for ETL (Extract, Transform, Load) operations is window functions, which enable sophisticated data transformations and analytics. In this comprehensive guide, we'll explore how to leverage window functions in both PySpark and T-SQL within Databricks to build efficient ETL pipelines.

Why Databricks for ETL?

Databricks offers several advantages for ETL workloads:

Unified Platform - Single platform for data engineering, data science, and analytics - Support for both batch and streaming data - Native integration with data lakes (Delta Lake) - Collaborative environment with notebooks

Performance - Optimized Spark runtime (3-5x faster than open-source Spark) - Photon engine for accelerated queries - Auto-scaling clusters - Intelligent caching and optimization

Delta Lake Integration - ACID transactions on data lakes - Time travel and versioning - Schema evolution - Efficient upserts and deletes

Understanding Window Functions

Window functions perform calculations across a set of rows that are related to the current row. Unlike GROUP BY aggregations that collapse rows, window functions retain individual row details while adding analytical insights.

Common Use Cases

  1. Running Totals and Moving Averages
  2. Ranking and Top-N Analysis
  3. Lead/Lag Comparisons
  4. Percentile Calculations
  5. Data Quality Checks

Window Functions in PySpark

Setting Up Your Databricks Environment

from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, row_number, rank, dense_rank, lag, lead,
    sum as _sum, avg, max as _max, min as _min,
    count, first, last, ntile, percent_rank, cume_dist
)

# Create Spark session (automatically available in Databricks)
spark = SparkSession.builder.appName("WindowFunctionsETL").getOrCreate()

Example Dataset: Sales Transactions

# Create sample sales data
sales_data = [
    (1, 'Electronics', 'Laptop', '2024-01-15', 1200.00, 'Store A'),
    (2, 'Electronics', 'Mouse', '2024-01-15', 25.00, 'Store A'),
    (3, 'Clothing', 'Shirt', '2024-01-16', 45.00, 'Store B'),
    (4, 'Electronics', 'Keyboard', '2024-01-16', 75.00, 'Store A'),
    (5, 'Clothing', 'Pants', '2024-01-17', 60.00, 'Store B'),
    (6, 'Electronics', 'Monitor', '2024-01-17', 350.00, 'Store A'),
    (7, 'Clothing', 'Jacket', '2024-01-18', 120.00, 'Store B'),
    (8, 'Electronics', 'Tablet', '2024-01-18', 450.00, 'Store A'),
]

columns = ['transaction_id', 'category', 'product', 'date', 'amount', 'store']
df = spark.createDataFrame(sales_data, columns)

# Convert date string to date type
from pyspark.sql.functions import to_date
df = df.withColumn('date', to_date(col('date'), 'yyyy-MM-dd'))

1. Ranking Functions

ROW_NUMBER() - Assign Unique Sequential Numbers

# Rank products by sales amount within each category
window_spec = Window.partitionBy('category').orderBy(col('amount').desc())

df_ranked = df.withColumn('row_num', row_number().over(window_spec))

# Get top 2 products per category
top_products = df_ranked.filter(col('row_num') <= 2)
top_products.display()

RANK() vs DENSE_RANK()

# RANK - Leaves gaps in ranking for ties
window_spec = Window.partitionBy('store').orderBy(col('amount').desc())

df_with_ranks = df \
    .withColumn('rank', rank().over(window_spec)) \
    .withColumn('dense_rank', dense_rank().over(window_spec))

df_with_ranks.display()

# Example output:
# Store A: $1200 (rank=1, dense_rank=1), $450 (rank=2, dense_rank=2), 
#          $350 (rank=3, dense_rank=3), $75 (rank=4, dense_rank=4)

2. Aggregate Window Functions

Running Totals (Cumulative Sum)

# Calculate running total of sales by category over time
window_spec = Window \
    .partitionBy('category') \
    .orderBy('date') \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_running_total = df.withColumn(
    'running_total', 
    _sum('amount').over(window_spec)
)

df_running_total.orderBy('category', 'date').display()

Moving Averages

# 3-day moving average
window_spec = Window \
    .partitionBy('category') \
    .orderBy('date') \
    .rowsBetween(-2, 0)  # Current row and 2 rows before

df_moving_avg = df.withColumn(
    'moving_avg_3day',
    avg('amount').over(window_spec)
)

df_moving_avg.display()

Cumulative Statistics

# Multiple aggregations in one window
window_spec = Window \
    .partitionBy('store') \
    .orderBy('date') \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

df_cumulative = df \
    .withColumn('cumulative_sales', _sum('amount').over(window_spec)) \
    .withColumn('cumulative_count', count('*').over(window_spec)) \
    .withColumn('cumulative_avg', avg('amount').over(window_spec)) \
    .withColumn('max_sale_so_far', _max('amount').over(window_spec))

df_cumulative.display()

3. Lag and Lead Functions

LAG() - Access Previous Row Values

# Compare current sale with previous sale in same store
window_spec = Window.partitionBy('store').orderBy('date')

df_lag = df \
    .withColumn('prev_amount', lag('amount', 1).over(window_spec)) \
    .withColumn('prev_product', lag('product', 1).over(window_spec)) \
    .withColumn('amount_change', col('amount') - col('prev_amount'))

df_lag.display()

LEAD() - Access Next Row Values

# Look ahead to next transaction
window_spec = Window.partitionBy('category').orderBy('date')

df_lead = df \
    .withColumn('next_amount', lead('amount', 1).over(window_spec)) \
    .withColumn('next_date', lead('date', 1).over(window_spec))

df_lead.display()

Complex Lag/Lead Analysis

# Calculate day-over-day change percentage
window_spec = Window.partitionBy('store').orderBy('date')

df_change = df \
    .withColumn('prev_day_amount', lag('amount', 1).over(window_spec)) \
    .withColumn(
        'pct_change',
        ((col('amount') - col('prev_day_amount')) / col('prev_day_amount') * 100)
    ) \
    .withColumn('trend', 
        when(col('pct_change') > 0, 'UP')
        .when(col('pct_change') < 0, 'DOWN')
        .otherwise('FLAT')
    )

df_change.display()

4. Distribution Functions

NTILE() - Divide Data into Buckets

# Divide transactions into quartiles based on amount
window_spec = Window.partitionBy('category').orderBy('amount')

df_quartiles = df.withColumn('quartile', ntile(4).over(window_spec))

# Analyze high-value transactions (top quartile)
high_value = df_quartiles.filter(col('quartile') == 4)
high_value.display()

PERCENT_RANK() - Relative Rank as Percentage

window_spec = Window.partitionBy('store').orderBy('amount')

df_percent = df \
    .withColumn('percent_rank', percent_rank().over(window_spec)) \
    .withColumn('cume_dist', cume_dist().over(window_spec))

df_percent.display()

5. First and Last Values

# Get first and last transaction per category
window_spec = Window.partitionBy('category').orderBy('date')

df_first_last = df \
    .withColumn('first_product', first('product').over(window_spec)) \
    .withColumn('first_amount', first('amount').over(window_spec)) \
    .withColumn('last_product', last('product').over(window_spec)) \
    .withColumn('last_amount', last('amount').over(window_spec))

df_first_last.display()

Window Functions in Databricks SQL

Creating Tables

-- Create a Delta table
CREATE OR REPLACE TABLE sales_transactions (
    transaction_id INT,
    category STRING,
    product STRING,
    transaction_date DATE,
    amount DECIMAL(10,2),
    store STRING,
    customer_id INT
) USING DELTA;

-- Insert sample data
INSERT INTO sales_transactions VALUES
(1, 'Electronics', 'Laptop', '2024-01-15', 1200.00, 'Store A', 101),
(2, 'Electronics', 'Mouse', '2024-01-15', 25.00, 'Store A', 102),
(3, 'Clothing', 'Shirt', '2024-01-16', 45.00, 'Store B', 103),
(4, 'Electronics', 'Keyboard', '2024-01-16', 75.00, 'Store A', 104),
(5, 'Clothing', 'Pants', '2024-01-17', 60.00, 'Store B', 105),
(6, 'Electronics', 'Monitor', '2024-01-17', 350.00, 'Store A', 106),
(7, 'Clothing', 'Jacket', '2024-01-18', 120.00, 'Store B', 107),
(8, 'Electronics', 'Tablet', '2024-01-18', 450.00, 'Store A', 108);

SQL Window Function Syntax

-- Basic window function structure
SELECT 
    column1,
    column2,
    WINDOW_FUNCTION() OVER (
        PARTITION BY partition_column
        ORDER BY order_column
        ROWS/RANGE BETWEEN start AND end
    ) AS result_column
FROM table_name;

1. Ranking in SQL

-- Rank products by sales within category
SELECT 
    category,
    product,
    amount,
    ROW_NUMBER() OVER (PARTITION BY category ORDER BY amount DESC) as row_num,
    RANK() OVER (PARTITION BY category ORDER BY amount DESC) as rank,
    DENSE_RANK() OVER (PARTITION BY category ORDER BY amount DESC) as dense_rank
FROM sales_transactions
ORDER BY category, amount DESC;

Top N Per Group

-- Get top 3 products per category
WITH ranked_products AS (
    SELECT 
        category,
        product,
        amount,
        ROW_NUMBER() OVER (PARTITION BY category ORDER BY amount DESC) as rn
    FROM sales_transactions
)
SELECT 
    category,
    product,
    amount
FROM ranked_products
WHERE rn <= 3;

2. Running Totals in SQL

-- Calculate running total of sales by category
SELECT 
    transaction_date,
    category,
    product,
    amount,
    SUM(amount) OVER (
        PARTITION BY category 
        ORDER BY transaction_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_total,
    AVG(amount) OVER (
        PARTITION BY category 
        ORDER BY transaction_date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as running_avg
FROM sales_transactions
ORDER BY category, transaction_date;

3. Moving Averages in SQL

-- 3-transaction moving average
SELECT 
    transaction_date,
    store,
    product,
    amount,
    AVG(amount) OVER (
        PARTITION BY store
        ORDER BY transaction_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as moving_avg_3,
    COUNT(*) OVER (
        PARTITION BY store
        ORDER BY transaction_date
        ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
    ) as window_size
FROM sales_transactions
ORDER BY store, transaction_date;

4. LAG and LEAD in SQL

-- Compare with previous and next transactions
SELECT 
    transaction_date,
    category,
    product,
    amount,
    LAG(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as prev_amount,
    LAG(product, 1) OVER (PARTITION BY category ORDER BY transaction_date) as prev_product,
    LEAD(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as next_amount,
    amount - LAG(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as amount_change
FROM sales_transactions
ORDER BY category, transaction_date;

5. Advanced SQL Window Patterns

Identifying Gaps in Sequences

-- Find missing transaction IDs
WITH all_ids AS (
    SELECT 
        transaction_id,
        LAG(transaction_id) OVER (ORDER BY transaction_id) as prev_id
    FROM sales_transactions
)
SELECT 
    prev_id + 1 as gap_start,
    transaction_id - 1 as gap_end
FROM all_ids
WHERE transaction_id - prev_id > 1;

Customer Purchase Patterns

-- Analyze time between customer purchases
SELECT 
    customer_id,
    transaction_date,
    product,
    amount,
    LAG(transaction_date) OVER (
        PARTITION BY customer_id 
        ORDER BY transaction_date
    ) as prev_purchase_date,
    DATEDIFF(
        transaction_date, 
        LAG(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date)
    ) as days_since_last_purchase
FROM sales_transactions
WHERE customer_id IS NOT NULL
ORDER BY customer_id, transaction_date;

Real-World ETL Pipeline Example

Scenario: Building a Sales Analytics Pipeline

from pyspark.sql.functions import *
from pyspark.sql.window import Window

# Step 1: Extract - Read raw data from data lake
raw_sales = spark.read \
    .format("delta") \
    .load("/mnt/raw/sales_data")

# Step 2: Transform - Apply window functions for analytics

# 2a. Calculate sales metrics by product
product_window = Window.partitionBy('product_id').orderBy('sale_date')

enriched_sales = raw_sales \
    .withColumn('sale_rank', row_number().over(product_window)) \
    .withColumn('cumulative_revenue', 
                _sum('revenue').over(product_window.rowsBetween(Window.unboundedPreceding, 0))) \
    .withColumn('prev_sale_date', lag('sale_date').over(product_window)) \
    .withColumn('days_between_sales', 
                datediff(col('sale_date'), col('prev_sale_date')))

# 2b. Calculate moving averages
ma_window = Window.partitionBy('product_category') \
    .orderBy('sale_date') \
    .rowsBetween(-6, 0)  # 7-day moving average

sales_with_ma = enriched_sales \
    .withColumn('ma_7day_revenue', avg('revenue').over(ma_window)) \
    .withColumn('ma_7day_quantity', avg('quantity').over(ma_window))

# 2c. Identify top performers
category_window = Window.partitionBy('product_category', 'sale_date') \
    .orderBy(col('revenue').desc())

top_products = sales_with_ma \
    .withColumn('daily_rank', rank().over(category_window)) \
    .filter(col('daily_rank') <= 10)

# 2d. Add percentile information
percentile_window = Window.partitionBy('product_category')

final_sales = top_products \
    .withColumn('revenue_percentile', 
                percent_rank().over(percentile_window.orderBy('revenue'))) \
    .withColumn('quartile', ntile(4).over(percentile_window.orderBy('revenue')))

# Step 3: Load - Write to Gold layer
final_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("sale_date") \
    .save("/mnt/gold/sales_analytics")

# Create table for SQL access
final_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable("gold.sales_analytics")

Corresponding SQL ETL Pipeline

-- Create Gold layer table with window functions
CREATE OR REPLACE TABLE gold.sales_analytics AS
WITH enriched_sales AS (
    SELECT 
        *,
        ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY sale_date) as sale_rank,
        SUM(revenue) OVER (
            PARTITION BY product_id 
            ORDER BY sale_date 
            ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
        ) as cumulative_revenue,
        LAG(sale_date) OVER (PARTITION BY product_id ORDER BY sale_date) as prev_sale_date,
        DATEDIFF(
            sale_date, 
            LAG(sale_date) OVER (PARTITION BY product_id ORDER BY sale_date)
        ) as days_between_sales
    FROM raw.sales_data
),
sales_with_ma AS (
    SELECT 
        *,
        AVG(revenue) OVER (
            PARTITION BY product_category 
            ORDER BY sale_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) as ma_7day_revenue,
        AVG(quantity) OVER (
            PARTITION BY product_category 
            ORDER BY sale_date 
            ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
        ) as ma_7day_quantity
    FROM enriched_sales
),
top_products AS (
    SELECT 
        *,
        RANK() OVER (
            PARTITION BY product_category, sale_date 
            ORDER BY revenue DESC
        ) as daily_rank
    FROM sales_with_ma
    WHERE daily_rank <= 10
)
SELECT 
    *,
    PERCENT_RANK() OVER (
        PARTITION BY product_category 
        ORDER BY revenue
    ) as revenue_percentile,
    NTILE(4) OVER (
        PARTITION BY product_category 
        ORDER BY revenue
    ) as quartile
FROM top_products;

Performance Optimization Tips

1. Minimize Shuffles

# Bad - Multiple shuffles
df1 = df.repartition('category')
window1 = Window.partitionBy('category').orderBy('date')
df1 = df1.withColumn('rank', rank().over(window1))

window2 = Window.partitionBy('category').orderBy('amount')
df1 = df1.withColumn('amount_rank', rank().over(window2))

# Good - Single partition, multiple windows
window1 = Window.partitionBy('category').orderBy('date')
window2 = Window.partitionBy('category').orderBy('amount')

df_optimized = df \
    .withColumn('rank', rank().over(window1)) \
    .withColumn('amount_rank', rank().over(window2))

2. Use Appropriate Window Frames

# Efficient - Bounded window
window = Window.partitionBy('store') \
    .orderBy('date') \
    .rowsBetween(-30, 0)  # Only 31 rows

# Less efficient - Unbounded window
window = Window.partitionBy('store') \
    .orderBy('date') \
    .rowsBetween(Window.unboundedPreceding, Window.currentRow)

3. Leverage Delta Lake Optimizations

# Enable auto-optimize
spark.conf.set("spark.databricks.delta.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.autoOptimize.autoCompact", "true")

# Write optimized data
final_sales.write \
    .format("delta") \
    .mode("overwrite") \
    .option("dataChange", "false") \
    .partitionBy("year", "month") \
    .save("/mnt/gold/sales_optimized")

4. Use Caching Strategically

# Cache intermediate results used multiple times
df_cached = df.cache()

window1 = Window.partitionBy('category').orderBy('date')
window2 = Window.partitionBy('store').orderBy('amount')

result1 = df_cached.withColumn('cat_rank', rank().over(window1))
result2 = df_cached.withColumn('store_rank', rank().over(window2))

# Don't forget to unpersist when done
df_cached.unpersist()

Common Pitfalls and Solutions

Pitfall 1: Incorrect Window Frame

# Problem - Missing ORDER BY for aggregate windows
window = Window.partitionBy('category')  # No ORDER BY
df.withColumn('running_total', _sum('amount').over(window))  # Wrong!

# Solution
window = Window.partitionBy('category').orderBy('date')
df.withColumn('running_total', _sum('amount').over(window))  # Correct

Pitfall 2: Performance Issues with Large Windows

-- Problem - Entire partition scanned
SELECT 
    *,
    AVG(amount) OVER (
        PARTITION BY customer_id 
        ORDER BY date
        ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
    ) as cumulative_avg
FROM large_table;

-- Solution - Use bounded window when possible
SELECT 
    *,
    AVG(amount) OVER (
        PARTITION BY customer_id 
        ORDER BY date
        ROWS BETWEEN 90 PRECEDING AND CURRENT ROW
    ) as rolling_90day_avg
FROM large_table;

Best Practices

  1. Always specify ORDER BY for aggregate window functions
  2. Use ROWS vs RANGE appropriately - ROWS is generally faster
  3. Partition on low-cardinality columns when possible
  4. Limit window frame size for better performance
  5. Cache DataFrames used in multiple window operations
  6. Use Delta Lake for efficient updates and time travel
  7. Test on sample data before running on full datasets
  8. Monitor query execution using Spark UI

Conclusion

Window functions in Databricks provide powerful capabilities for building sophisticated ETL pipelines. Whether you're working with PySpark or SQL, mastering window functions enables you to:

  • Perform complex analytical calculations efficiently
  • Maintain row-level detail while adding aggregated insights
  • Build incremental processing pipelines
  • Implement advanced data quality checks
  • Create rich analytical datasets for downstream consumption

By combining Databricks' optimized Spark runtime with window functions, you can build scalable, performant ETL pipelines that handle enterprise-scale data processing requirements.


Need help building Databricks ETL pipelines? Contact LanaCloud at projectteam@lanacloud.com for expert guidance on implementing scalable data solutions.

LanaCloud Assistant
Online · usually replies instantly