Enrich Streaming Data with Batched Data

Streaming data can be enriched using the following scenarios:

  1. Static References
  2. Dynamic Data Sets
  3. Another Streaming Data Source

This post covers two of the above scenarios: 1 and 2.

First setup the streaming reader

outputPath = f'{working_dir}/output' # working_dir is a predefined path
outputPathBronze = f'{outputPath}/bronze'

deviceStream = (spark
   .readStream
   .format('delta')
   .load(outputPathBronze))

Load static/reference data

def loadStaticData(path):
  df = (spark.read.format('delta').load(path))
  return df

# lookupSourcePath is path to the static reference data
deviceReferenceDF = loadStaticData(lookupSourcePath)

Define a function to enrich the streaming data

from pyspark.sql.functions import col

def bronzeToSilver(deviceStreamReader, silverPath, streamName, lookupDF):
  devicesStream = (deviceStreamReader
   .join(lookupDF, ['device_id'], 'left') # Join with reference static data
   .select(col('params.device_id').alias('device_id'),
           col('eventName'),
           col('params.client_event_time').alias('client_event_time'),
           col('eventDate'),
           col('deviceType'))
  )

  (devicesStream
    .writeStream
    .format('delta')
    .outputMode('append')
    .queryName(streamName)
    .option("checkpointLocation", f"{silverPath}_checkpoint")
    .start(silverPath))

None