
Picture by Writer | Ideogram
You realize that feeling when you could have information scattered throughout completely different codecs and sources, and you have to make sense of all of it? That is precisely what we’re fixing as we speak. Let’s construct an ETL pipeline that takes messy information and turns it into one thing really helpful.
On this article, I am going to stroll you thru making a pipeline that processes e-commerce transactions. Nothing fancy, simply sensible code that will get the job executed.
We’ll seize information from a CSV file (such as you’d obtain from an e-commerce platform), clear it up, and retailer it in a correct database for evaluation.
🔗 Hyperlink to the code on GitHub
What Is an Extract, Rework, Load (ETL) Pipeline?
Each ETL pipeline follows the identical sample. You seize information from someplace (Extract), clear it up and make it higher (Rework), then put it someplace helpful (Load).

ETL Pipeline | Picture by Writer | diagrams.web (draw.io)
The method begins with the extract section, the place information is retrieved from numerous supply techniques comparable to databases, APIs, recordsdata, or streaming platforms. Throughout this section, the pipeline identifies and pulls related information whereas sustaining connections to disparate techniques which will function on completely different schedules and codecs.
Subsequent the remodel section represents the core processing stage, the place extracted information undergoes cleansing, validation, and restructuring. This step addresses information high quality points, applies enterprise guidelines, performs calculations, and converts information into the required format and construction. Frequent transformations embrace information sort conversions, discipline mapping, aggregations, and the elimination of duplicates or invalid data.
Lastly, the load section transfers the now reworked information into the goal system. This step can happen by means of full masses, the place total datasets are changed, or incremental masses, the place solely new or modified information is added. The loading technique depends upon elements comparable to information quantity, system efficiency necessities, and enterprise wants.
Step 1: Extract
The “extract” step is the place we get our arms on information. In the actual world, you is likely to be downloading this CSV out of your e-commerce platform’s reporting dashboard, pulling it from an FTP server, or getting it through API. Right here, we’re studying from an out there CSV file.
def extract_data_from_csv(csv_file_path):
attempt:
print(f"Extracting information from {csv_file_path}...")
df = pd.read_csv(csv_file_path)
print(f"Efficiently extracted {len(df)} data")
return df
besides FileNotFoundError:
print(f"Error: {csv_file_path} not discovered. Creating pattern information...")
csv_file = create_sample_csv_data()
return pd.read_csv(csv_file)
Now that now we have the uncooked information from its supply (raw_transactions.csv), we have to remodel it into one thing usable.
Step 2: Rework
That is the place we make the info really helpful.
def transform_data(df):
print("Remodeling information...")
df_clean = df.copy()
# Take away data with lacking emails
initial_count = len(df_clean)
df_clean = df_clean.dropna(subset=('customer_email'))
removed_count = initial_count - len(df_clean)
print(f"Eliminated {removed_count} data with lacking emails")
# Calculate derived fields
df_clean('total_amount') = df_clean('value') * df_clean('amount')
# Extract date elements
df_clean('transaction_date') = pd.to_datetime(df_clean('transaction_date'))
df_clean('12 months') = df_clean('transaction_date').dt.12 months
df_clean('month') = df_clean('transaction_date').dt.month
df_clean('day_of_week') = df_clean('transaction_date').dt.day_name()
# Create buyer segments
df_clean('customer_segment') = pd.reduce(df_clean('total_amount'),
bins=(0, 50, 200, float('inf')),
labels=('Low', 'Medium', 'Excessive'))
return df_clean
First, we’re dropping rows with lacking emails as a result of incomplete buyer information is not useful for many analyses.
Then we calculate total_amount
by multiplying value and amount. This appears apparent, however you would be shocked how usually derived fields like this are lacking from uncooked information.
The date extraction is de facto useful. As a substitute of simply having a timestamp, now now we have separate 12 months, month, and day-of-week columns. This makes it straightforward to research patterns like “can we promote extra on weekends?”
The client segmentation utilizing pd.reduce()
will be notably helpful. It mechanically buckets prospects into spending classes. Now as an alternative of simply having transaction quantities, now we have significant enterprise segments.
Step 3: Load
In an actual challenge, you is likely to be loading right into a database, sending to an API, or pushing to cloud storage.
Right here, we’re loading our clear information into a correct SQLite database.
def load_data_to_sqlite(df, db_name="ecommerce_data.db", table_name="transactions"):
print(f"Loading information to SQLite database '{db_name}'...")
conn = sqlite3.join(db_name)
attempt:
df.to_sql(table_name, conn, if_exists="substitute", index=False)
cursor = conn.cursor()
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
record_count = cursor.fetchone()(0)
print(f"Efficiently loaded {record_count} data to '{table_name}' desk")
return f"Knowledge efficiently loaded to {db_name}"
lastly:
conn.shut()
Now analysts can run SQL queries, join BI instruments, and truly use this information for decision-making.
SQLite works nicely for this as a result of it is light-weight, requires no setup, and creates a single file you possibly can simply share or backup. The if_exists="substitute"
parameter means you possibly can run this pipeline a number of occasions with out worrying about duplicate information.
We have added verification steps so you realize the load was profitable. There’s nothing worse than pondering your information is safely saved solely to seek out an empty desk later.
Working the ETL Pipeline
This orchestrates the whole extract, remodel, load workflow.
def run_etl_pipeline():
print("Beginning ETL Pipeline...")
# Extract
raw_data = extract_data_from_csv('raw_transactions.csv')
# Rework
transformed_data = transform_data(raw_data)
# Load
load_result = load_data_to_sqlite(transformed_data)
print("ETL Pipeline accomplished efficiently!")
return transformed_data
Discover how this ties the whole lot collectively. Extract, remodel, load, executed. You possibly can run this and instantly see your processed information.
You will discover the entire code on GitHub.
Wrapping Up
This pipeline takes uncooked transaction information and turns it into one thing an analyst or information scientist can really work with. You’ve got bought clear data, calculated fields, and significant segments.
Every perform does one factor nicely, and you’ll simply modify or prolong any half with out breaking the remaining.
Now attempt working it your self. Additionally attempt to modify it for an additional use case. Joyful coding!
Bala Priya C is a developer and technical author from India. She likes working on the intersection of math, programming, information science, and content material creation. Her areas of curiosity and experience embrace DevOps, information science, and pure language processing. She enjoys studying, writing, coding, and occasional! Presently, she’s engaged on studying and sharing her information with the developer group by authoring tutorials, how-to guides, opinion items, and extra. Bala additionally creates participating useful resource overviews and coding tutorials.