CSV ETL Automation: Extract, Transform, and Load Data Like a Pro
CSV ETL Automation: Extract, Transform, and Load Data Like a Pro
ETL — Extract, Transform, Load — is the backbone of every data pipeline. And CSV files are where most ETL pipelines start or end. Whether you are pulling data from APIs, cleaning messy exports, or loading records into a database, mastering CSV ETL automation saves hours of manual work every week.
What Is CSV ETL?
ETL applied to CSV files means:
- Extract: Get CSV data from sources — APIs, databases, email attachments, FTP servers, web scraping, or manual exports
- Transform: Clean, reshape, enrich, validate, and standardize the data
- Load: Push the processed data to a destination — database, data warehouse, another CSV, spreadsheet, or API
The goal is turning raw, messy CSV files into clean, reliable data that feeds your analytics, reports, or applications.
Extract: Getting CSV Data from Anywhere
From APIs
Many services offer CSV exports via API:
python
import requests
Download CSV from an API endpoint
response = requests.get(
'https://api.example.com/reports/sales',
headers={'Authorization': 'Bearer YOUR_TOKEN'},
params={'format': 'csv', 'date_from': '2024-01-01'}
)
with open('sales_export.csv', 'w') as f:
f.write(response.text)
From Databases
Export query results directly to CSV:
python
import duckdb
Query PostgreSQL and export to CSV
con = duckdb.connect()
con.execute("""
INSTALL postgres; LOAD postgres;
COPY (
SELECT * FROM postgres_scan('dbname=mydb', 'public', 'orders')
WHERE order_date >= '2024-01-01'
) TO 'orders_export.csv' (HEADER, DELIMITER ',')
""")
From Email Attachments
python
import imaplib
import email
mail = imaplib.IMAP4_SSL('imap.gmail.com')
mail.login('user@gmail.com', 'app_password')
mail.select('inbox')
_, messages = mail.search(None, 'SUBJECT "Daily Report" UNSEEN')
for num in messages[0].split():
_, data = mail.fetch(num, '(RFC822)')
msg = email.messagefrombytes(data[0][1])
for part in msg.walk():
if part.getfilename() and part.getfilename().endswith('.csv'):
with open(f'incoming/{part.get_filename()}', 'wb') as f:
f.write(part.get_payload(decode=True))
From Cloud Storage
python
import boto3
s3 = boto3.client('s3')
s3.download_file('my-bucket', 'exports/daily-report.csv', 'daily-report.csv')
Transform: Cleaning and Reshaping CSV Data
Transformation is where most of the work happens. Here are the most common operations:
Basic Cleaning with pandas
python
import pandas as pd
df = pd.readcsv('rawdata.csv', encoding='utf-8')
Remove duplicate rows
df = df.drop_duplicates()
Strip whitespace from string columns
strcols = df.selectdtypes(include='object').columns
df[strcols] = df[strcols].apply(lambda x: x.str.strip())
Standardize date formats
df['date'] = pd.to_datetime(df['date'], format='mixed')
Fill missing values
df['category'] = df['category'].fillna('Uncategorized')
Remove rows with negative amounts
df = df[df['amount'] >= 0]
df.tocsv('cleaneddata.csv', index=False)
Advanced Transformation with DuckDB
For larger files or SQL-preferred workflows, DuckDB transforms CSV data efficiently:
sql
-- Aggregate, filter, and reshape in one query
COPY (
SELECT
date_trunc('month', date) as month,
region,
product_category,
COUNT(*) as order_count,
SUM(amount) as total_revenue,
AVG(amount) as avgordervalue
FROM readcsvauto('raw_orders.csv')
WHERE date >= '2024-01-01'
AND status != 'cancelled'
GROUP BY 1, 2, 3
ORDER BY 1, 4 DESC
) TO 'monthly_summary.csv' (HEADER)
Command-Line Transformation with csvkit
csvkit is a suite of command-line tools for CSV manipulation:
bash
Preview structure
csvstat sales.csv
Filter rows
csvgrep -c region -m "North" sales.csv > north_sales.csv
Select specific columns
csvcut -c date,product,revenue sales.csv > slim_sales.csv
Sort by column
csvsort -c revenue -r sales.csv > sorted_sales.csv
Join two CSV files
csvjoin -c productid sales.csv products.csv > enrichedsales.csv
Convert to JSON
csvjson sales.csv > sales.json
Data Validation
Validate before loading to catch problems early:
python
def validate_csv(filepath):
df = pd.read_csv(filepath)
errors = []
# Check required columns
required = ['order_id', 'date', 'amount']
missing = [col for col in required if col not in df.columns]
if missing:
errors.append(f"Missing columns: {missing}")
# Check for nulls in required fields
for col in required:
if col in df.columns:
null_count = df[col].isnull().sum()
if null_count > 0:
errors.append(f"{col}: {null_count} null values")
# Check data types
if 'amount' in df.columns:
nonnumeric = pd.tonumeric(df['amount'], errors='coerce').isnull().sum()
if non_numeric > 0:
errors.append(f"amount: {non_numeric} non-numeric values")
return errors
Load: Delivering Processed Data
To a Database
python
from sqlalchemy import create_engine
engine = create_engine('postgresql://user:pass@localhost/mydb')
df.tosql('orders', engine, ifexists='append', index=False)
To Google Sheets
python
import gspread
gc = gspread.service_account(filename='credentials.json')
sh = gc.open('Monthly Report')
worksheet = sh.sheet1
worksheet.update([df.columns.values.tolist()] + df.values.tolist())
To Another CSV (with different structure)
python
Reshape for the target system's expected format
output = df[['order_id', 'date', 'total']].copy()
output.columns = ['OrderNumber', 'OrderDate', 'Amount'] # Rename for target schema
output['OrderDate'] = output['OrderDate'].dt.strftime('%m/%d/%Y') # Reformat dates
output.tocsv('forimport.csv', index=False)
Building a Complete ETL Pipeline
Here is a production-ready pipeline script:
python
import pandas as pd
import logging
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(name)
def run_pipeline():
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
# EXTRACT
log.info("Extracting data...")
rawpath = Path('incoming/dailyexport.csv')
if not raw_path.exists():
log.error("Source file not found")
return False
df = pd.readcsv(rawpath, encoding='utf-8')
log.info(f"Extracted {len(df)} rows")
# TRANSFORM
log.info("Transforming...")
initial_count = len(df)
df = df.drop_duplicates()
df = df.dropna(subset=['order_id', 'amount'])
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
df = df[df['amount'] > 0]
log.info(f"Kept {len(df)}/{initial_count} rows after cleaning")
# LOAD
log.info("Loading...")
outputpath = f'processed/clean{timestamp}.csv'
df.tocsv(outputpath, index=False)
log.info(f"Saved to {output_path}")
# ARCHIVE
rawpath.rename(f'archive/dailyexport_{timestamp}.csv')
log.info("Pipeline complete")
return True
if name == 'main':
run_pipeline()
Quick Tools for Manual ETL Steps
Not every step needs code. Use browser-based tools for the parts between automated processes:
- Inspect source files: Open in CSV Viewer to verify structure before building your pipeline
- Convert formats: Use the Excel ↔ CSV converter when your source delivers Excel files but your pipeline expects CSV
- Generate test data: Build sample CSV files with the CSV Creator to test your pipeline without using production data
- Validate output visually: Check your pipeline's output in CSV Charts to catch issues that row counts alone won't reveal
Scheduling Your Pipeline
cron (Linux/macOS)
bash
Run ETL pipeline every day at 6 AM
0 6 * cd /home/user/etl && python pipeline.py >> /var/log/etl.log 2>&1
GitHub Actions
yaml
name: Daily ETL
on:
schedule:
- cron: '0 6 *'
jobs:
etl:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: '3.12'
- run: pip install pandas
- run: python pipeline.py
Conclusion
CSV ETL automation is not just for data engineers. With pandas for transformation, DuckDB for SQL-based processing, csvkit for command-line operations, and browser tools like CSV Viewer for inspection, anyone who works with data regularly can build reliable pipelines. Start with a single script that automates your most repetitive workflow, then expand from there.