Enrich Streaming Data with Batched Data
Streaming data can be enriched using the following scenarios:
- Static References
- Dynamic Data Sets
- Another Streaming Data Source
This post covers two of the above scenarios: 1 and 2.
First setup the streaming reader
outputPath = f'{working_dir}/output' # working_dir is a predefined path
outputPathBronze = f'{outputPath}/bronze'
deviceStream = (spark
.readStream
.format('delta')
.load(outputPathBronze))
Load static/reference data
def loadStaticData(path):
df = (spark.read.format('delta').load(path))
return df
# lookupSourcePath is path to the static reference data
deviceReferenceDF = loadStaticData(lookupSourcePath)
Define a function to enrich the streaming data
from pyspark.sql.functions import col
def bronzeToSilver(deviceStreamReader, silverPath, streamName, lookupDF):
devicesStream = (deviceStreamReader
.join(lookupDF, ['device_id'], 'left') # Join with reference static data
.select(col('params.device_id').alias('device_id'),
col('eventName'),
col('params.client_event_time').alias('client_event_time'),
col('eventDate'),
col('deviceType'))
)
(devicesStream
.writeStream
.format('delta')
.outputMode('append')
.queryName(streamName)
.option("checkpointLocation", f"{silverPath}_checkpoint")
.start(silverPath))
None