Learn how to build a scalable real-time data pipeline in Databricks by joining two Kafka streams using Apache Spark Structured Streaming and watermarks. This guide includes a hands-on use case, full PySpark code, and key architectural insights for handling late-arriving data.

🧠 Why This Matters

In real-time analytics, joining two live data streams can unlock deep insights:

  • Match user clicks with purchases
  • Detect fraudulent activity
  • Combine sensor readings from multiple sources

But without proper handling, stream-stream joins in Apache Spark can lead to unbounded memory usage. That’s where watermarks save the day.

🔹 Real-World Use Case: Clicks + Purchases

An e-commerce company wants to:

  • Ingest user click events on product pages
  • Ingest purchase events
  • Join them in real time to understand conversion within 10 minutes

Kafka Topics:

  • clicks: contains userId, productId, clickTime
  • purchases: contains userId, productId, purchaseTime

Goal: Join clicks and purchases per userId + productId where the purchase happens within 10 minutes of the click.

📊 Problem Statement

Stream-stream joins require Spark to buffer data from both streams in memory. Without a way to discard old unmatched records, memory usage can grow indefinitely. This can crash your job.

🔧 Solution: Use Watermarks

Watermarks are event-time markers that allow Spark to:

  • Set boundaries on how late a record can arrive
  • Safely discard old state entries

Example:

withWatermark("clickTime", "10 minutes")

This tells Spark to drop click events older than 10 minutes from the current watermark.

📒 Full PySpark Code (Databricks Ready)

from pyspark.sql.functions import expr
from pyspark.sql.types import StructType, StringType, TimestampType

# Define schemas
clicks_schema = StructType() \
.add("userId", StringType()) \
.add("productId", StringType()) \
.add("clickTime", TimestampType())

purchases_schema = StructType() \
.add("userId", StringType()) \
.add("productId", StringType()) \
.add("purchaseTime", TimestampType())

# Read clicks stream
clicks = spark.readStream.format("kafka") \
.option("subscribe", "clicks") \
.option("kafka.bootstrap.servers", "<broker>") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.selectExpr("split(value, ',')[0] as userId",
"split(value, ',')[1] as productId",
"to_timestamp(split(value, ',')[2]) as clickTime") \
.withWatermark("clickTime", "10 minutes")

# Read purchases stream
purchases = spark.readStream.format("kafka") \
.option("subscribe", "purchases") \
.option("kafka.bootstrap.servers", "<broker>") \
.load() \
.selectExpr("CAST(value AS STRING)") \
.selectExpr("split(value, ',')[0] as userId",
"split(value, ',')[1] as productId",
"to_timestamp(split(value, ',')[2]) as purchaseTime") \
.withWatermark("purchaseTime", "10 minutes")

# Stream-stream join logic
joined = clicks.join(
purchases,
expr("""
clicks.userId = purchases.userId AND
clicks.productId = purchases.productId AND
purchaseTime BETWEEN clickTime AND clickTime + interval 10 minutes
"""
)
)

# Write output to console
query = joined.writeStream \
.format("console") \
.outputMode("append") \
.start()

query.awaitTermination()

🤫 How It Works

  • Watermarking ensures that Spark drops old data safely
  • Join only happens for matching userId/productId within 10-minute window
  • No unbounded memory risk thanks to time constraints

📊 Comparison with Other Technologies

🌐 Best Practices

  • Use event-time columns, not ingestion time
  • Define watermarks on both streams
  • Tune watermark duration based on delay tolerance
  • Monitor state metrics in long-running pipelines

📅 When Should You Use This?

  • Clickstream + conversion tracking
  • Fraud detection from login + transaction streams
  • IoT: Match multiple sensor inputs
  • Real-time event enrichment from external APIs

🔹 Summary

Joining two live Kafka streams is easy in Databricks with Apache Spark. But doing it right requires careful use of watermarks to avoid memory issues.

This guide gives you the tools to:

  • Join real-time streams
  • Handle late data
  • Keep your system stable and scalable

🔗 Reference: Databricks Blog on Stream-Stream Joins

Databricks Course | Databricks Training | Databricks Online Training — AccentFuture

Master Databricks with expert-led training on big data, AI, and ML, covering Apache Spark, real-time analytics, and cloud integration (AWS, Azure, Google Cloud). Gain hands-on experience and advance your career with our industry-focused Databricks Training!

🚀 Enroll Now: https://www.accentfuture.com/enquiry-form/

📞 Call Us: +91–9640001789

📧 Email Us: contact@accentfuture.com

🌐 Visit Us: AccentFuture