Streaming Analytical Solution to challenge survival of Bookstore in digital economy

Neeraj Somani
6 min readJan 10, 2024

High-level architecture of the Data Pipeline

It’s a multi-Hop or Medallion Architecture 3 layers (Bronze, Silver, Gold)

Technology used:

Databricks, AWS, Azure, kafka, pyspark, SQL, python

3 separate category of data used to analyze the bookstore operation

Customers, Orders and Books

Key-high-level overview of the flow of data

  1. streaming data coming through kafka and store it in multiplex bronze table.

2. In silver layer, we refined data in more needed format. like, how to segregate multiplex data into separate tables in silver layer,

  • In orders table silver layer — we will implement Quality enforcement and streaming deduplication
  • In books silver layer table — we will create SCD-type2 and a new table get created with batch overwrite logic
  • In customers silver table, CDC (change data capture) functionality will be implemented and CDF (change data feed) functionality of Delta table has been implemented and create a final Customer table that will propogate incremental chnages to downstream tables.

3. Perform multiple streaming joins to populate data in Gold tables. “Countries_stats_vw” will get data using stream-stream join and “authors_stats” gets data using stream-static join. These final gold tables are materialized views.

Why we choose multiplex ingestion pattern? Difference between Singleplex vs multiplex

  • In Singleplex — one source table/dataset/topic is connected to one target table. Its mostly used in batch processing. Because streaming job has a limitation of maximum limit of concurrent jobs, multiplex is used.
  • In multiplex — one or more source table/dataset/topic is connected to one target table. this is mostly used in streaming processing. A kafka like pub/sub system is used to implement this.

process-part-1-Ingestion Step (bronze)

In-order to streamline the process and avoid complexity of kafka, I have used json files that is stored in Azure data lake storage Gen2 object storage.

as shown above each row is separated for each topic in json file.

A sample code shown below to read data from Azure datalake, specify format and schema for data. Creating partitions and allowing merge schema flexibility for schema evolution. Trigger and CheckpointLocation is used to allow automatic incremental load. Process start/stop based on data availability.

from pyspark.sql import functions as F

def process_bronze():

schema = "key BINARY, value BINARY, topic STRING, partition LONG, offset LONG, timestamp LONG"

query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.schema(schema)
.load(f"{dataset_bookstore}/kafka-raw")
.withColumn("timestamp", (F.col("timestamp")/1000).cast("timestamp"))
.withColumn("year_month", F.date_format("timestamp", "yyyy-MM"))
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/bronze")
.option("mergeSchema", True)
.partitionBy("topic", "year_month")
.trigger(availableNow=True)
.table("bronze"))

query.awaitTermination()

For loading new data from kafka stream to bookstore data, below function code used.

def load_new_data(self, num_files = 1):
streaming_dir = f"{self.location}/kafka-streaming"
raw_dir = f"{self.location}/kafka-raw"
for i in range(num_files):
self.__load_data(10, streaming_dir, raw_dir)

process-part-2-Processing Step (Silver)

Clean dataset, like datatypes of columns, null values check, timestamp conversion, parsing json object and explode the columns into simple dataframe.

Below sample code shows how to convert this bronze static table into streaming read temp view for continuous read operation. So we can run streaming spark SQL queries.

Orders Silver Table

from pyspark.sql import functions as F

json_schema = "order_id STRING, order_timestamp Timestamp, customer_id STRING, quantity BIGINT, total BIGINT, books ARRAY<STRUCT<book_id STRING, quantity BIGINT, subtotal BIGINT>>"

query = (spark.readStream.table("bronze")
.filter("topic = 'orders'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.writeStream
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/orders_silver")
.trigger(availableNow=True)
.table("orders_silver"))

query.awaitTermination()

Now we are just creating order_silver table from this stream temp view. Using this above code data will be updates in the streaming multiple batches until no data available.

Apply some quality enforcement checks to avoid data issues

below are few examples

Here, we are adding 2 constraints

  1. order_timestamp should be greater than a specific date
  2. quantity should be greater than 0
ALTER TABLE orders_silver ADD CONSTRAINT timestamp_within_range CHECK (order_timestamp >= '2020-01-01');

ALTER TABLE orders_silver ADD CONSTRAINT valid_quantity CHECK (quantity > 0);

Need to build a logic in our pipeline to either discard such records or implement a solution how to process such records with pipeline continuously running.

How to handle duplicate records in the streaming data

dropDuplicates() function is used on key columns

structured streaming can track state information for the unique keys in the data, this helps in handling duplicate records. This is part of metadata in Delta lake architecture.

Watermarking functionality allows to only track state information for a window of time in which we expect duplicate records because of streaming functionality. Hence, this ensures no duplicate records within each micro-batches. If we don’t use watermarking then history get larger and can impact the performance of pipeline.

deduped_df = (spark.readStream
.table("bronze")
.filter("topic = 'orders'")
.select(F.from_json(F.col("value").cast("string"), json_schema).alias("v"))
.select("v.*")
.withWatermark("order_timestamp", "30 seconds")
.dropDuplicates(["order_id", "order_timestamp"]))

Also, if duplicate data comes in separate microbatches than we can handle it by using Merge operation and apply this on each batch.

MERGE INTO orders_silver a
USING orders_microbatch b
ON a.order_id=b.order_id AND a.order_timestamp=b.order_timestamp
WHEN NOT MATCHED THEN INSERT *

Books Silver Table

First we create books silver table that should be SCD-type-2, because we want to keep trace of all the price modifications of the book, so we can verify our ordered total amount at any specific time.

Second, we want to create another current_books silver table that has details only about latest books information.

 def porcess_books_silver(self):
schema = "book_id STRING, title STRING, author STRING, price DOUBLE, updated TIMESTAMP"

query = (spark.readStream
.table("bronze")
.filter("topic = 'books'")
.select(F.from_json(F.col("value").cast("string"), schema).alias("v"))
.select("v.*")
.writeStream
.foreachBatch(self.__type2_upsert)
.option("checkpointLocation", f"{self.checkpoint}/books_silver")
.trigger(availableNow=True)
.start()
)

query.awaitTermination()

def process_current_books(self):
spark.sql("""
CREATE OR REPLACE TABLE current_books
AS SELECT book_id, title, author, price
FROM books_silver
WHERE current IS TRUE
""")

Customer Silver table

As you can see in the architecture diagram for customer silver table we need to implement CDF/CDC feature

In this case raw data contains information about each record, is it for Insert, Update, or delete. Based on that flag we can implement our process.

Since merge operation can’t be performed if we have multiple rows for with same key field. Hence, we need to identify only the most recent change and use that in our CDC feed.

Note: Since, rank window function is not allowed in streaming logic. We used the merge operation for the CDC feed.

from pyspark.sql.window import Window

def batch_upsert(microBatchDF, batchId):
window = Window.partitionBy("customer_id").orderBy(F.col("row_time").desc())

(microBatchDF.filter(F.col("row_status").isin(["insert", "update"]))
.withColumn("rank", F.rank().over(window))
.filter("rank == 1")
.drop("rank")
.createOrReplaceTempView("ranked_updates"))

query = """
MERGE INTO customers_silver c
USING ranked_updates r
ON c.customer_id=r.customer_id
WHEN MATCHED AND c.row_time < r.row_time
THEN UPDATE SET *
WHEN NOT MATCHED
THEN INSERT *
"""

microBatchDF.sparkSession.sql(query)

As soon as we enable CDC feed on our customer_silver delta table, there is another metadata directory other than delta_log directory to store metadata information about CDC of customer table and version history.

Stream-stream join

Customer-orders table join for further processing

def porcess_customers_orders():
orders_df = spark.readStream.table("orders_silver")

cdf_customers_df = (spark.readStream
.option("readChangeData", True)
.option("startingVersion", 2)
.table("customers_silver")
)

query = (orders_df
.join(cdf_customers_df, ["customer_id"], "inner")
.writeStream
.foreachBatch(batch_upsert)
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/customers_orders")
.trigger(availableNow=True)
.start()
)

query.awaitTermination()

porcess_customers_orders()

Stream-static join

books_sales table joined for further processing

from pyspark.sql import functions as F

def process_books_sales():

orders_df = (spark.readStream.table("orders_silver")
.withColumn("book", F.explode("books"))
)

books_df = spark.read.table("current_books")

query = (orders_df
.join(books_df, orders_df.book.book_id == books_df.book_id, "inner")
.writeStream
.outputMode("append")
.option("checkpointLocation", "dbfs:/mnt/demo_pro/checkpoints/books_sales")
.trigger(availableNow=True)
.table("books_sales")
)

query.awaitTermination()

process_books_sales()

Gold tables (countries_stats_vw, authors_stats)

countries_stats_vw — has details about how many orders for each book and total quantity of books per country per day (daily sales of each books per country).

CREATE VIEW IF NOT EXISTS countries_stats_vw AS (
SELECT country, date_trunc("DD", order_timestamp) order_date, count(order_id) orders_count, sum(quantity) books_count
FROM customers_orders
GROUP BY country, date_trunc("DD", order_timestamp)
)

authors_stats — This table stores summary statistics of sales per author. It calculates order count and the average quantity of per author and per order_date_timestamp for each non-overlapping five minutes interval.

from pyspark.sql import functions as F

query = (spark.readStream
.table("books_sales")
.withWatermark("order_timestamp", "10 minutes")
.groupBy(
F.window("order_timestamp", "5 minutes").alias("time"),
"author")
.agg(
F.count("order_id").alias("orders_count"),
F.avg("quantity").alias ("avg_quantity"))
.writeStream
.option("checkpointLocation", f"dbfs:/mnt/demo_pro/checkpoints/authors_stats")
.trigger(availableNow=True)
.table("authors_stats")
)

query.awaitTermination()

Notice, here how we used watermark functionality to handle late, out-of-order data, deduplication and limit the state of records.

***Credit goes to Derar Alhussein instructure for his Databricks course on Udemy.

--

--

Neeraj Somani

Data Analytics Engineer, crossing paths in Data Science, Data Engineering and DevOps. Bringing you lots of exciting projects in the form of stories. Enjoy-Love.