[Day 198] Transactions Data Streaming Pipeline Porject [v1 completed]
Hello :)
Today is Day 198!
A quick summary of today:- data streaming pipeline project [v1 done]
Turns out in order to insert the full (~70 variables with nested/list structure), I need the proper pyspark schema. And yesterday I did not have that and that is why when I was reading data in the kafka producer I was getting NULL in the columns - my schema was wrong. Well today I not only fixed the schema for the 4 variables I had yesterday, but included *all* the variables that come from the Stripe API ~ 70 (for completeness).
When I run docker-compose, the data streams and is input into the postgres db (and is still running). Unfortunately, the free Stripe API for creating realistic transactions has a limit of 25, so every 3 seconds, 25 new transactions are sent to the db. It has been running half the day (since I got that set up) and as I am writing the db has (checking db) 28675 rows of transactions.
After I got this set up, I decided to move onto dbt for a bit of data documentation and modelling.
This is the data model I settled for now. And using dbt I created the dimension tables and the fact table. I decided to use dbt core now, rather than dbt cloud - just to see what is the difference - and it is not that different but the cloud version has its conveniences. I also wanted to document as much as possible (there is probably more to do), and I generated and hosted the dbt docs on netlify. Here is the link.As I was writing the dbt models, I thought about adding some more data - to make this whole thing a bit more interesting. And I thought to add currency exchange data. Since the transactions are in GBP, if a customer is paying from overseas we might need to convert the charge into some other currency. I looked around, and saw forex-python - it did not work for some reason, and then I saw an API url from which I can get daily exchange rates. I quickly set up a python script and put it in docker to be ran every 24hr (to get the current day's exchange rates for some chosen currencies).
The final dbt data lineage looks like:
There are of course things to improve:
- add PySpark tests
- use an orchestrator
- use more data for a better Grafana visualisation
- Spark might be an overkill due to the data amount limitations, but I wanted to learn how to set Spark Streaming up in case data is much more
- maybe find an alternative transactions data source because the Stripe API has a 25 rate limit
- also many of the generated values in a transaction from the Stripe API are null
That is all for today!
See you tomorrow :)