[Day 198] Transactions Data Streaming Pipeline Porject [v1 completed]

 Hello :)
Today is Day 198!


A quick summary of today:
  • data streaming pipeline project [v1 done]


Well ... I did not know I can do it in a day (~14 hours) after yesterday's issues but here we are.

Turns out in order to insert the full (~70 variables with nested/list structure), I need the proper pyspark schema. And yesterday I did not have that and that is why when I was reading data in the kafka producer I was getting NULL in the columns - my schema was wrong. Well today I not only fixed the schema for the 4 variables I had yesterday, but included *all* the variables that come from the Stripe API ~ 70 (for completeness). 

When I run docker-compose, the data streams and is input into the postgres db (and is still running). Unfortunately, the free Stripe API for creating realistic transactions has a limit of 25, so every 3 seconds, 25 new transactions are sent to the db. It has been running half the day (since I got that set up) and as I am writing the db has (checking db) 28675 rows of transactions. 

After I got this set up, I decided to move onto dbt for a bit of data documentation and modelling.

This is the data model I settled for now. And using dbt I created the dimension tables and the fact table. I decided to use dbt core now, rather than dbt cloud - just to see what is the difference - and it is not that different but the cloud version has its conveniences. I also wanted to document as much as possible (there is probably more to do), and I generated and hosted the dbt docs on netlify. Here is the link.

As I was writing the dbt models, I thought about adding some more data - to make this whole thing a bit more interesting. And I thought to add currency exchange data. Since the transactions are in GBP, if a customer is paying from overseas we might need to convert the charge into some other currency. I looked around, and saw forex-python - it did not work for some reason, and then I saw an API url from which I can get daily exchange rates. I quickly set up a python script and put it in docker to be ran every 24hr (to get the current day's exchange rates for some chosen currencies). 

The final dbt data lineage looks like:


Finally, I tried making some kind of a dashboard in Grafana. I have little data, and it is only for less than a day. So I tried to make something. 

However, I will keep running the docker compose to get new transaction data into the db and after a few days I can create a bit more interesting dashboard with differences in exchange rates between days, transaction amounts. 

There are of course things to improve:

  • add PySpark tests
  • use an orchestrator
  • use more data for a better Grafana visualisation
    • Spark might be an overkill due to the data amount limitations, but I wanted to learn how to set Spark Streaming up in case data is much more
    • maybe find an alternative transactions data source because the Stripe API has a 25 rate limit
    • also many of the generated values in a transaction from the Stripe API are null
maybe others too. I did post on dataengineering on reddit, and asked in DataTalksClubs for feedback, so I look forward to improving this.


That is all for today!

See you tomorrow :)

Popular posts from this blog

[Day 107] Transforming natural language to charts

[Day 54] I became a backprop ninja! (woohoo)