Mastering ETL with Databricks: Window Functions in PySpark and SQL
Databricks has emerged as a leading unified analytics platform, combining the power of Apache Spark with collaborative notebooks and enterprise-grade reliability. One of the most powerful features for ETL (Extract, Transform, Load) operations is window functions, which enable sophisticated data transformations and analytics. In this comprehensive guide, we'll explore how to leverage window functions in both PySpark and T-SQL within Databricks to build efficient ETL pipelines.
Why Databricks for ETL?
Databricks offers several advantages for ETL workloads:
Unified Platform - Single platform for data engineering, data science, and analytics - Support for both batch and streaming data - Native integration with data lakes (Delta Lake) - Collaborative environment with notebooks
Performance - Optimized Spark runtime (3-5x faster than open-source Spark) - Photon engine for accelerated queries - Auto-scaling clusters - Intelligent caching and optimization
Delta Lake Integration - ACID transactions on data lakes - Time travel and versioning - Schema evolution - Efficient upserts and deletes
Understanding Window Functions
Window functions perform calculations across a set of rows that are related to the current row. Unlike GROUP BY aggregations that collapse rows, window functions retain individual row details while adding analytical insights.
Common Use Cases
- Running Totals and Moving Averages
- Ranking and Top-N Analysis
- Lead/Lag Comparisons
- Percentile Calculations
- Data Quality Checks
Window Functions in PySpark
Setting Up Your Databricks Environment
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import (
col, row_number, rank, dense_rank, lag, lead,
sum as _sum, avg, max as _max, min as _min,
count, first, last, ntile, percent_rank, cume_dist
)
# Create Spark session (automatically available in Databricks)
spark = SparkSession.builder.appName("WindowFunctionsETL").getOrCreate()
Example Dataset: Sales Transactions
# Create sample sales data
sales_data = [
(1, 'Electronics', 'Laptop', '2024-01-15', 1200.00, 'Store A'),
(2, 'Electronics', 'Mouse', '2024-01-15', 25.00, 'Store A'),
(3, 'Clothing', 'Shirt', '2024-01-16', 45.00, 'Store B'),
(4, 'Electronics', 'Keyboard', '2024-01-16', 75.00, 'Store A'),
(5, 'Clothing', 'Pants', '2024-01-17', 60.00, 'Store B'),
(6, 'Electronics', 'Monitor', '2024-01-17', 350.00, 'Store A'),
(7, 'Clothing', 'Jacket', '2024-01-18', 120.00, 'Store B'),
(8, 'Electronics', 'Tablet', '2024-01-18', 450.00, 'Store A'),
]
columns = ['transaction_id', 'category', 'product', 'date', 'amount', 'store']
df = spark.createDataFrame(sales_data, columns)
# Convert date string to date type
from pyspark.sql.functions import to_date
df = df.withColumn('date', to_date(col('date'), 'yyyy-MM-dd'))
1. Ranking Functions
ROW_NUMBER() - Assign Unique Sequential Numbers
# Rank products by sales amount within each category
window_spec = Window.partitionBy('category').orderBy(col('amount').desc())
df_ranked = df.withColumn('row_num', row_number().over(window_spec))
# Get top 2 products per category
top_products = df_ranked.filter(col('row_num') <= 2)
top_products.display()
RANK() vs DENSE_RANK()
# RANK - Leaves gaps in ranking for ties
window_spec = Window.partitionBy('store').orderBy(col('amount').desc())
df_with_ranks = df \
.withColumn('rank', rank().over(window_spec)) \
.withColumn('dense_rank', dense_rank().over(window_spec))
df_with_ranks.display()
# Example output:
# Store A: $1200 (rank=1, dense_rank=1), $450 (rank=2, dense_rank=2),
# $350 (rank=3, dense_rank=3), $75 (rank=4, dense_rank=4)
2. Aggregate Window Functions
Running Totals (Cumulative Sum)
# Calculate running total of sales by category over time
window_spec = Window \
.partitionBy('category') \
.orderBy('date') \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_running_total = df.withColumn(
'running_total',
_sum('amount').over(window_spec)
)
df_running_total.orderBy('category', 'date').display()
Moving Averages
# 3-day moving average
window_spec = Window \
.partitionBy('category') \
.orderBy('date') \
.rowsBetween(-2, 0) # Current row and 2 rows before
df_moving_avg = df.withColumn(
'moving_avg_3day',
avg('amount').over(window_spec)
)
df_moving_avg.display()
Cumulative Statistics
# Multiple aggregations in one window
window_spec = Window \
.partitionBy('store') \
.orderBy('date') \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
df_cumulative = df \
.withColumn('cumulative_sales', _sum('amount').over(window_spec)) \
.withColumn('cumulative_count', count('*').over(window_spec)) \
.withColumn('cumulative_avg', avg('amount').over(window_spec)) \
.withColumn('max_sale_so_far', _max('amount').over(window_spec))
df_cumulative.display()
3. Lag and Lead Functions
LAG() - Access Previous Row Values
# Compare current sale with previous sale in same store
window_spec = Window.partitionBy('store').orderBy('date')
df_lag = df \
.withColumn('prev_amount', lag('amount', 1).over(window_spec)) \
.withColumn('prev_product', lag('product', 1).over(window_spec)) \
.withColumn('amount_change', col('amount') - col('prev_amount'))
df_lag.display()
LEAD() - Access Next Row Values
# Look ahead to next transaction
window_spec = Window.partitionBy('category').orderBy('date')
df_lead = df \
.withColumn('next_amount', lead('amount', 1).over(window_spec)) \
.withColumn('next_date', lead('date', 1).over(window_spec))
df_lead.display()
Complex Lag/Lead Analysis
# Calculate day-over-day change percentage
window_spec = Window.partitionBy('store').orderBy('date')
df_change = df \
.withColumn('prev_day_amount', lag('amount', 1).over(window_spec)) \
.withColumn(
'pct_change',
((col('amount') - col('prev_day_amount')) / col('prev_day_amount') * 100)
) \
.withColumn('trend',
when(col('pct_change') > 0, 'UP')
.when(col('pct_change') < 0, 'DOWN')
.otherwise('FLAT')
)
df_change.display()
4. Distribution Functions
NTILE() - Divide Data into Buckets
# Divide transactions into quartiles based on amount
window_spec = Window.partitionBy('category').orderBy('amount')
df_quartiles = df.withColumn('quartile', ntile(4).over(window_spec))
# Analyze high-value transactions (top quartile)
high_value = df_quartiles.filter(col('quartile') == 4)
high_value.display()
PERCENT_RANK() - Relative Rank as Percentage
window_spec = Window.partitionBy('store').orderBy('amount')
df_percent = df \
.withColumn('percent_rank', percent_rank().over(window_spec)) \
.withColumn('cume_dist', cume_dist().over(window_spec))
df_percent.display()
5. First and Last Values
# Get first and last transaction per category
window_spec = Window.partitionBy('category').orderBy('date')
df_first_last = df \
.withColumn('first_product', first('product').over(window_spec)) \
.withColumn('first_amount', first('amount').over(window_spec)) \
.withColumn('last_product', last('product').over(window_spec)) \
.withColumn('last_amount', last('amount').over(window_spec))
df_first_last.display()
Window Functions in Databricks SQL
Creating Tables
-- Create a Delta table
CREATE OR REPLACE TABLE sales_transactions (
transaction_id INT,
category STRING,
product STRING,
transaction_date DATE,
amount DECIMAL(10,2),
store STRING,
customer_id INT
) USING DELTA;
-- Insert sample data
INSERT INTO sales_transactions VALUES
(1, 'Electronics', 'Laptop', '2024-01-15', 1200.00, 'Store A', 101),
(2, 'Electronics', 'Mouse', '2024-01-15', 25.00, 'Store A', 102),
(3, 'Clothing', 'Shirt', '2024-01-16', 45.00, 'Store B', 103),
(4, 'Electronics', 'Keyboard', '2024-01-16', 75.00, 'Store A', 104),
(5, 'Clothing', 'Pants', '2024-01-17', 60.00, 'Store B', 105),
(6, 'Electronics', 'Monitor', '2024-01-17', 350.00, 'Store A', 106),
(7, 'Clothing', 'Jacket', '2024-01-18', 120.00, 'Store B', 107),
(8, 'Electronics', 'Tablet', '2024-01-18', 450.00, 'Store A', 108);
SQL Window Function Syntax
-- Basic window function structure
SELECT
column1,
column2,
WINDOW_FUNCTION() OVER (
PARTITION BY partition_column
ORDER BY order_column
ROWS/RANGE BETWEEN start AND end
) AS result_column
FROM table_name;
1. Ranking in SQL
-- Rank products by sales within category
SELECT
category,
product,
amount,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY amount DESC) as row_num,
RANK() OVER (PARTITION BY category ORDER BY amount DESC) as rank,
DENSE_RANK() OVER (PARTITION BY category ORDER BY amount DESC) as dense_rank
FROM sales_transactions
ORDER BY category, amount DESC;
Top N Per Group
-- Get top 3 products per category
WITH ranked_products AS (
SELECT
category,
product,
amount,
ROW_NUMBER() OVER (PARTITION BY category ORDER BY amount DESC) as rn
FROM sales_transactions
)
SELECT
category,
product,
amount
FROM ranked_products
WHERE rn <= 3;
2. Running Totals in SQL
-- Calculate running total of sales by category
SELECT
transaction_date,
category,
product,
amount,
SUM(amount) OVER (
PARTITION BY category
ORDER BY transaction_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_total,
AVG(amount) OVER (
PARTITION BY category
ORDER BY transaction_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as running_avg
FROM sales_transactions
ORDER BY category, transaction_date;
3. Moving Averages in SQL
-- 3-transaction moving average
SELECT
transaction_date,
store,
product,
amount,
AVG(amount) OVER (
PARTITION BY store
ORDER BY transaction_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as moving_avg_3,
COUNT(*) OVER (
PARTITION BY store
ORDER BY transaction_date
ROWS BETWEEN 2 PRECEDING AND CURRENT ROW
) as window_size
FROM sales_transactions
ORDER BY store, transaction_date;
4. LAG and LEAD in SQL
-- Compare with previous and next transactions
SELECT
transaction_date,
category,
product,
amount,
LAG(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as prev_amount,
LAG(product, 1) OVER (PARTITION BY category ORDER BY transaction_date) as prev_product,
LEAD(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as next_amount,
amount - LAG(amount, 1) OVER (PARTITION BY category ORDER BY transaction_date) as amount_change
FROM sales_transactions
ORDER BY category, transaction_date;
5. Advanced SQL Window Patterns
Identifying Gaps in Sequences
-- Find missing transaction IDs
WITH all_ids AS (
SELECT
transaction_id,
LAG(transaction_id) OVER (ORDER BY transaction_id) as prev_id
FROM sales_transactions
)
SELECT
prev_id + 1 as gap_start,
transaction_id - 1 as gap_end
FROM all_ids
WHERE transaction_id - prev_id > 1;
Customer Purchase Patterns
-- Analyze time between customer purchases
SELECT
customer_id,
transaction_date,
product,
amount,
LAG(transaction_date) OVER (
PARTITION BY customer_id
ORDER BY transaction_date
) as prev_purchase_date,
DATEDIFF(
transaction_date,
LAG(transaction_date) OVER (PARTITION BY customer_id ORDER BY transaction_date)
) as days_since_last_purchase
FROM sales_transactions
WHERE customer_id IS NOT NULL
ORDER BY customer_id, transaction_date;
Real-World ETL Pipeline Example
Scenario: Building a Sales Analytics Pipeline
from pyspark.sql.functions import *
from pyspark.sql.window import Window
# Step 1: Extract - Read raw data from data lake
raw_sales = spark.read \
.format("delta") \
.load("/mnt/raw/sales_data")
# Step 2: Transform - Apply window functions for analytics
# 2a. Calculate sales metrics by product
product_window = Window.partitionBy('product_id').orderBy('sale_date')
enriched_sales = raw_sales \
.withColumn('sale_rank', row_number().over(product_window)) \
.withColumn('cumulative_revenue',
_sum('revenue').over(product_window.rowsBetween(Window.unboundedPreceding, 0))) \
.withColumn('prev_sale_date', lag('sale_date').over(product_window)) \
.withColumn('days_between_sales',
datediff(col('sale_date'), col('prev_sale_date')))
# 2b. Calculate moving averages
ma_window = Window.partitionBy('product_category') \
.orderBy('sale_date') \
.rowsBetween(-6, 0) # 7-day moving average
sales_with_ma = enriched_sales \
.withColumn('ma_7day_revenue', avg('revenue').over(ma_window)) \
.withColumn('ma_7day_quantity', avg('quantity').over(ma_window))
# 2c. Identify top performers
category_window = Window.partitionBy('product_category', 'sale_date') \
.orderBy(col('revenue').desc())
top_products = sales_with_ma \
.withColumn('daily_rank', rank().over(category_window)) \
.filter(col('daily_rank') <= 10)
# 2d. Add percentile information
percentile_window = Window.partitionBy('product_category')
final_sales = top_products \
.withColumn('revenue_percentile',
percent_rank().over(percentile_window.orderBy('revenue'))) \
.withColumn('quartile', ntile(4).over(percentile_window.orderBy('revenue')))
# Step 3: Load - Write to Gold layer
final_sales.write \
.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("sale_date") \
.save("/mnt/gold/sales_analytics")
# Create table for SQL access
final_sales.write \
.format("delta") \
.mode("overwrite") \
.saveAsTable("gold.sales_analytics")
Corresponding SQL ETL Pipeline
-- Create Gold layer table with window functions
CREATE OR REPLACE TABLE gold.sales_analytics AS
WITH enriched_sales AS (
SELECT
*,
ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY sale_date) as sale_rank,
SUM(revenue) OVER (
PARTITION BY product_id
ORDER BY sale_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_revenue,
LAG(sale_date) OVER (PARTITION BY product_id ORDER BY sale_date) as prev_sale_date,
DATEDIFF(
sale_date,
LAG(sale_date) OVER (PARTITION BY product_id ORDER BY sale_date)
) as days_between_sales
FROM raw.sales_data
),
sales_with_ma AS (
SELECT
*,
AVG(revenue) OVER (
PARTITION BY product_category
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as ma_7day_revenue,
AVG(quantity) OVER (
PARTITION BY product_category
ORDER BY sale_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) as ma_7day_quantity
FROM enriched_sales
),
top_products AS (
SELECT
*,
RANK() OVER (
PARTITION BY product_category, sale_date
ORDER BY revenue DESC
) as daily_rank
FROM sales_with_ma
WHERE daily_rank <= 10
)
SELECT
*,
PERCENT_RANK() OVER (
PARTITION BY product_category
ORDER BY revenue
) as revenue_percentile,
NTILE(4) OVER (
PARTITION BY product_category
ORDER BY revenue
) as quartile
FROM top_products;
Performance Optimization Tips
1. Minimize Shuffles
# Bad - Multiple shuffles
df1 = df.repartition('category')
window1 = Window.partitionBy('category').orderBy('date')
df1 = df1.withColumn('rank', rank().over(window1))
window2 = Window.partitionBy('category').orderBy('amount')
df1 = df1.withColumn('amount_rank', rank().over(window2))
# Good - Single partition, multiple windows
window1 = Window.partitionBy('category').orderBy('date')
window2 = Window.partitionBy('category').orderBy('amount')
df_optimized = df \
.withColumn('rank', rank().over(window1)) \
.withColumn('amount_rank', rank().over(window2))
2. Use Appropriate Window Frames
# Efficient - Bounded window
window = Window.partitionBy('store') \
.orderBy('date') \
.rowsBetween(-30, 0) # Only 31 rows
# Less efficient - Unbounded window
window = Window.partitionBy('store') \
.orderBy('date') \
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
3. Leverage Delta Lake Optimizations
# Enable auto-optimize
spark.conf.set("spark.databricks.delta.autoOptimize.optimizeWrite", "true")
spark.conf.set("spark.databricks.delta.autoOptimize.autoCompact", "true")
# Write optimized data
final_sales.write \
.format("delta") \
.mode("overwrite") \
.option("dataChange", "false") \
.partitionBy("year", "month") \
.save("/mnt/gold/sales_optimized")
4. Use Caching Strategically
# Cache intermediate results used multiple times
df_cached = df.cache()
window1 = Window.partitionBy('category').orderBy('date')
window2 = Window.partitionBy('store').orderBy('amount')
result1 = df_cached.withColumn('cat_rank', rank().over(window1))
result2 = df_cached.withColumn('store_rank', rank().over(window2))
# Don't forget to unpersist when done
df_cached.unpersist()
Common Pitfalls and Solutions
Pitfall 1: Incorrect Window Frame
# Problem - Missing ORDER BY for aggregate windows
window = Window.partitionBy('category') # No ORDER BY
df.withColumn('running_total', _sum('amount').over(window)) # Wrong!
# Solution
window = Window.partitionBy('category').orderBy('date')
df.withColumn('running_total', _sum('amount').over(window)) # Correct
Pitfall 2: Performance Issues with Large Windows
-- Problem - Entire partition scanned
SELECT
*,
AVG(amount) OVER (
PARTITION BY customer_id
ORDER BY date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) as cumulative_avg
FROM large_table;
-- Solution - Use bounded window when possible
SELECT
*,
AVG(amount) OVER (
PARTITION BY customer_id
ORDER BY date
ROWS BETWEEN 90 PRECEDING AND CURRENT ROW
) as rolling_90day_avg
FROM large_table;
Best Practices
- Always specify ORDER BY for aggregate window functions
- Use ROWS vs RANGE appropriately - ROWS is generally faster
- Partition on low-cardinality columns when possible
- Limit window frame size for better performance
- Cache DataFrames used in multiple window operations
- Use Delta Lake for efficient updates and time travel
- Test on sample data before running on full datasets
- Monitor query execution using Spark UI
Conclusion
Window functions in Databricks provide powerful capabilities for building sophisticated ETL pipelines. Whether you're working with PySpark or SQL, mastering window functions enables you to:
- Perform complex analytical calculations efficiently
- Maintain row-level detail while adding aggregated insights
- Build incremental processing pipelines
- Implement advanced data quality checks
- Create rich analytical datasets for downstream consumption
By combining Databricks' optimized Spark runtime with window functions, you can build scalable, performant ETL pipelines that handle enterprise-scale data processing requirements.
Need help building Databricks ETL pipelines? Contact LanaCloud at projectteam@lanacloud.com for expert guidance on implementing scalable data solutions.