Senior Data Engineer
CommerceIQ.AISoftware Engineer - Data
Unravel DataData Engineer
Tabsquare.AISoftware Developer Intern
Yellow.AIBigQuery
Apache Airflow
Docker
Terraform
Github Actions
Python
dbt
MySQL
Google Cloud Platform
BigQuery
Git
Unit testing
REST API
Apache Spark
Terraform
Flask
SQLAlchemy
Github Actions
Slack
Retool
Github Actions
AWS
Hi. I'm Naveen Srinivas. I'm working as a data engineer in a startup called CommerceIQ. I've been working in the data engineering space ever since I graduated. So I started my career as a data engineer intern. I worked there as a data engineer intern for 6 months. And after working for 6 months, I moved to a full time data engineer role. My first company was my primary role was building retail pipelines using Apache Spark, Python, and Google Cloud, and we use BigQuery for data warehousing. And in the second company, I switched after a couple of years working in the first company, I moved to a different role, a kind of a software engineer and data engineer role. Here, I build data pipelines again using the same Google Cloud stack and also using Python. SQL is also my typical day to day tool, which I use to create the data in different data platforms. And after a couple of years in my second company, I moved to a different company called Commerce IQ, which is very currently working. So here I work as a senior data engineer. I mentor a couple of junior data engineers. And my primary role here again is building data engineering workflows, setting up the data infrastructure, setting up the data warehouse, and, uh, also building client facing retail pipelines and also building looker dashboards. So, yeah, this is my overview.
Okay. To achieve data consistency, um, across different stores, So in post class, we use the relational table format. So we have rows and columns. And in high also, we use the similar format. And, uh, in Snowflake, it's again the same, but to achieve the consistency across different ones, it's dependent on how we model the data. So in Postgres, it is a relational database. We use the, uh, relational modeling. Um, we use entities and relationships and draw conclusions like primary keys and foreign keys and relationship between them. Hive and Snowflake are a different, uh, ballgame. They are actually, uh, tools considered in data warehousing. So Hi Viz, uh, for data warehousing, which is an open open source product, and Snowflake is a big, uh, version of the same data warehousing. So to achieve data consistency across different stores, we need to first properly define the schema of, uh, different columns and fields that we're using in across these three tables. And we need to ensure that the same schema and the data types are being used. So we need to enforce, um, the data type consistency and also the relationship between the different tables. So this is how we can achieve the consistency.
Sure. For version control, uh, in our present company, we use GitHub, and there are multiple data engineers working on the same data pipeline. So that's when GitHub and a tool like Git comes in handy. So we can achieve different, uh, versioning, uh, mechanism using GitHub. And, also, maybe you can we can use something, uh, example, we can use different branches and different versions like master dev and also staging and production. And then we can use uh, the rollback mechanism. So in case there is something that failed in production, we can always go back and revert it to the previous working branch. So that is how we can achieve, um, rollback capabilities and also version control. Git and GitHub would be the answer.
So, the first step in identifying data pipeline failures, it should not be something which you discover that the pipeline has failed. It should be something that should be alerted whenever you set up your pipeline. So, whenever you are setting up your pipeline, the design should also consider about how the pipeline will be running the interval and also in case if it fails, what should be the notification mechanism. So, if the pipeline fails, it should be notifying the data engineers and the relevant stakeholders in the team via email or slack or a similar messaging tool. So, once we have the notification, we should have the steps handy on how we can debug the pipeline and understand how we can easily go to the error message and see and understand what exactly has happened and why the pipeline has failed. So, once we have the necessary steps on how we can proceed, maybe we can follow a procedure like a runbook and see if the steps that we follow in the runbook can be done and replicated on the data pipeline and see if that fixes the issue or else we will have to dive deep in and understand why the error is there and try and resolve it and make it foolproof and next time it should not come with the same error. And regarding the downtime, I think we should have necessary SLA set whenever we are building the data pipelines. So, whenever we set up pipelines, it should run at certain interval and cadence and it should run for a specific time interval and should not exceed that. So, whenever these failures occur, it should also be informed that the pipelines are never 100% or it can never run successfully all the time so that that should be informed to the relevant stakeholders that pipeline will fail and when it sometimes fail, it should be resolved within a certain duration.
Yeah. We can write Python scripts to validate the different, data points in our data pipeline. So before, uh, let's say we have an ideal pipeline that writes data. We can write Python scripts and try and understand what kind of data we are ingesting in the first place. We can write data validation checks like if there is data itself present in the data frame or data set that we're ingesting to Snowflake. So we can do simple validation checks like writing unit tests to see if there is data present. We can check the number of rows. And, also, we can write scripts to check if the specific data frame or dataset has the relevant columns that is expected to be ingested into Snowflake. So in that way, we can ensure that, um, before writing anything directly into Snowflake, we have the validation checks. And we can use these scripts to, uh, design the use cases and design test cases on how the data set should look like before inserting into Snowflake. So, yes, Python is a very good way to write automated scripts through data validation. And and then once we are good with all the checks, we can go ahead and proceed writing to the data warehouse. And if it is not according to the standards that we expect, the data set and we should be notifying that the data validation checks have failed. And we can write an automation script to notify the people, uh, the data engineers or data scientists who are involved in this. They should be notified via Slack or email.
Python is actually an object oriented programming language, and it is very flexible and easy to use. So what typically, in the data engineering domain, I think everyone are moving towards Python since it's easy to use and easily adoptable. It has multiple packages that are, uh, very handy when it comes to data engineering. So we can leverage Python to develop, uh, different retail workflows. We have the, uh, Python API for spark. So we can use the PySpark framework to write the detailed pipelines. And the different sources that we have, Python definitely has a lot of libraries to handle them. So it will be very easy to handle different data sources, uh, data formats, and then try and process the data according to our business logic and needs. And then finally, build a nice ETL pipeline with good data validation checks, monitoring, and alerting.
Consider we write, uh, a function here, stream data, that uses the DB connection as a parameter. And we have the cursor that is used to connect to the database. We have a variable defined for that so that that can be used when we're querying, uh, the database. So we write the query, select star from large table, and then we are executing the query. Then I can see that we are running a while loop irrespective of, uh, whether the con so the condition is always set to true. So the while loop will be running constantly. And we only break it if the row that is being queried is none. So what we're trying to do is we use the cursor variable, and then we are trying to execute the query. And then we are just fetching 1 row. So if the row is null, let's say, the streaming pipeline will break. It'll exit the, uh, function and the cursor will be closed. So we might we might try to handle this, um, null rows. And it's better that we not exit, but instead move this null row separately and then handle them and continue to proceed with the streaming instead of breaking the loop and exiting the streaming pipe.
So I can see that we have a try and accept block. So in the try block we have a variable data and that is running, that is assigned to a function extract data. And then we have another variable transform data that calls another function transform data with the variable data. And then finally we have the load data call, it has the transform data as a parameter. So we have an exception block set and then we have a logging message set. So it just says that it's going to log the error and we have another raise call within the same exception and it is saying that it raises another exception, custom exception and data loading failed. So what might be the potential issue with the ways exceptions are being handled? I think this looks kind of alright to me since we have the exception logging done so that the developer can see the error message and also we are raising other custom exception. So instead of raising other custom exception we should directly use the specific exception when raising it instead of specifying and raising another exception within the exception that we are using. So what I am trying to say is in the except block, so we have to specify the correct exception instead of specifying the generic exception that is specified. So instead we can raise the other exception directly in the except block itself instead of raising a separate exception. So the raise should usually have just a simple exception message that it should be raising and the except block should have the specific exception method instead of the generic exception.
So, uh, in my first company, I I was responsible, uh, setting up the data warehouse of the company from scratch along with the senior data engineer. So in that phase, uh, we decided how we are going to implement the data warehouse. So we had to do a lot of homework on how we can set up the data warehouse instead of dive diving in directly. So we had collected all the requirements on how the data should look like within the data warehouse, and what kind of users would be using this data warehouse, and what kind of products and, um, different things that can that they can make use of using the data. So we collect all these requirements first, and then understood the stakeholders and the users and what they're trying to get out of the data. And then finally, once we had the requirements set, we started implementing, uh, a spreadsheet kind of, uh, like a log or database to see what kind of rows and columns that we would be needing, and the list of all the tables that we should be creating, and deciding how the relationship among these tables should be. So we set up a data warehouse that comes under dimensional modeling. So we decided to go with a flat, uh, table for serving our data visualization needs. So we had a looker and Google Data Studio dashboards, which we're using the database or the data warehouse that we set up. So we decided to go with dimensional modeling, and we correctly set up the different tables. And we decided the proper scheme of for that And also what kind of partition keys and clustering columns that would be used for that. And, uh, that's how we started the implementation. And how it scaled is based on our requirements since we initially planned out and thoughtfully created the data pipeline and, uh, the data warehouse setup. We had the, um, benefit of, uh, working in a production environment and doing things seamlessly. And then whenever there was an increase in load, uh, the data warehouse were able was able to scale up, and it was not giving any performance issues. And how it improved the performance and scalability scalability in a large scale. As I mentioned, uh, the data size was ever increasing since we started onboarding newer clients and we started gathering more data. So we could see a huge influx of data, and it was ever increasing in the data warehouse. But since we do define proper schemas and we rarely made any changes, uh, since we had curated the resources and gathered all the requirements beforehand. We could not we did not find it, um, much hard, and we found it easy that we could maintain it and see that the system scaled well with the requirements and the implementation. That was done pretty well. So, yeah, as in when we onboarded more and more clients, we could see that the data warehouse was performing well since we defined well defined partitions, columns, table structure, the dependencies between them, and the type of modeling that we choose, the dimensional modeling. So it was a star schema kind of structure. So the star schema would be, uh, for example, we had the fact tables and the related dimensional tables, And the fact table would be the order, uh, placed by a customer in the restaurant, and the dimension tables would be the details of the customer, the location, and other metadata. And finally, we had the other info, like the sales details in the fact table. And that's how we implemented the complex data model.
So, here it's mentioned that the expert is in Java, but I would like to clarify that I'm mostly working on Python ever since I started my career. So, Python also has similar, I would say like object oriented programming style of programming and it has a lot of sophisticated set of libraries and tools that we can write robust ETL pipelines and it also has concurrency related features for improved performance. So I would say instead of querying or writing code in Java, I would be more comfortable in Python, but Java is also something which I'm learning currently and I would be using all the necessary set of features that it provides to build robust ETL pipelines, write proper unit tests, monitor the data pipeline and set up proper alerting for the ETL pipeline that I would be building.
So I've worked with Airflow for more than a few years now, so I've worked on setting up the entire Airflow infrastructure from scratch. So what kind of steps that would be taken to ensure the reliability would be, a step would be restricting the access to the Airflow DAGs and workflows to only data engineers and people should be having the access and not the entire team, for example. And also only the data engineer should be having the access to trigger the pipelines, maybe the other users can be given a viewer access. And then to ensure reliability, we would be tying the Airflow workflows with notifications. So in case any of the workflow fails, we'd be having notifications in Slack or email or any messaging channel, and we'll be instantly notified that something has gone wrong in our Airflow workflow. So this would ensure the reliability that the Airflow workflow will be working as expected when we set it up, and it will continue to do so. For scalability, initially, when we experimented with Airflow, we started using a single instance and we set up our Airflow web scheduler and the service, basically the UI on a single VM, but it actually failed because of different virtual machine issues and reboots. So we moved to a different solution, like a managed version of Airflow called Google Cloud Composer. So when we use a managed version, we don't have to worry about the infrastructure and the failures that comes with maintaining the infrastructure. So we can have a managed service like Google Cloud Composer, and then use Google Cloud's infrastructure for scalability. So as and when we have more and more DAGs coming in, more and more workflows that are being set up, so it would be easy that Google Cloud can handle it and scale well. And we can also use the proper operators and basically, for example, we would be using Celery or Kubernetes in the back end for Airflow. I think that would ensure that it can scale well also. So in this way, we can ensure that reliability and scalability are achieved with Airflow.