🐍 Day 4/30 — Python for Data Engineers Functions. The building blocks of every pipeline. Every Airflow DAG task, every dbt macro, every ETL step — they're all just functions under the hood. Here's what separates beginner Python from production-grade DE code 👇 3 things I use in every pipeline: 1. Type hints — makes your code self-documenting def extract(table: str) -> list: 2. **kwargs — flexible config without breaking the signature def load_data(table, schema="public", **opts): load_data("orders", limit=1000, dry_run=True) 3. Lambda with sorted() — one of the most used patterns sorted(jobs, key=lambda j: j["priority"]) And if you use Airflow, you already use decorators daily: @task def run_dbt_model(model: str): ... That @task is just a decorator — a function that wraps your function. Today's cheat sheet covers: → Function anatomy with type hints → All 4 parameter types (positional, default, *args, **kwargs) → Lambda syntax + real examples → map(), filter(), reduce() → LEGB scope rule → Decorators → Real ETL pipeline patterns 📌 Full cheat sheet above — save it. Day 5 tomorrow: Conditionals & Loops 🔁 What's your go-to function pattern in pipelines? Drop it below 👇 #Python #DataEngineering #30DaysOfPython #Airflow #LearnPython #DataEngineer
Jaswanth Thathireddy’s Post
More Relevant Posts
-
🐍 Day 7/30 — Python for Data Engineers File I/O, CSV & JSON. The bread and butter of every ingestion pipeline. Before you touch pandas or Spark — you need to know how Python handles raw files. Because in real pipelines, you'll deal with: → CSVs dropped by vendors in S3 → JSON payloads from REST APIs → JSONL files in your data lake raw layer → Config files that drive your pipeline logic The #1 mistake I see beginners make: # ❌ Wrong — file never closes if an error occurs f = open("data.csv", "r") data = f.read() # ✅ Right — auto-closes even on exceptions with open("data.csv", "r") as f: data = f.read() And the thing that confused me for weeks: json.load(f) # reads from a FILE object json.loads(s) # parses a STRING json.dump(d, f) # writes to a FILE json.dumps(d) # returns a STRING The "s" = string. Once you know that, it sticks forever. For data lake files, JSONL is king: # One JSON object per line — memory efficient with open("events.jsonl") as f: events = [json.loads(line) for line in f if line.strip()] Today's cheat sheet covers: → open() with context managers → All 6 file modes explained → Key file methods (with memory warnings) → csv.DictReader / DictWriter → Common CSV gotchas (encoding, newline, delimiter) → json.load / loads / dump / dumps → JSONL pattern + CSV → JSON transform 📌 Every section has a plain-English explanation — save it. Day 8 tomorrow: OS & Pathlib — Navigate the Filesystem Like a Pro 📁 Which format do you deal with most in your pipelines — CSV or JSON? 👇 #Python #DataEngineering #30DaysOfPython #LearnPython #DataEngineer #ETL #DataAnalyst #DataAnalysis #Data #PythonDev
To view or add a comment, sign in
-
-
Swipe through the slides first 👉 then read below 👇 🚀 Day 25 of 30 — Learning PySpark from Scratch A pipeline that crashes with no error message is worse than a pipeline that doesn't run at all. 😬 Here's how I write robust PySpark pipelines now 👇 ⚡ The 3-layer defence system Layer 1 → try/except around each stage Layer 2 → Python logging module (not print()) Layer 3 → Data quality assertions between stages 💻 The production pipeline template import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s — %(levelname)s — %(message)s") logger = logging.getLogger(__name__) def run_pipeline(): try: # Stage 1: Read df = spark.read.option("badRecordsPath", "output/bad/").csv("data.csv", header=True) logger.info(f"Read: {df.count()} rows") # Quality check — fail fast on bad data assert df.count() > 100, "Too few rows — data may be missing" # Stage 2: Clean df = df.dropna(subset=["revenue"]) logger.info(f"After clean: {df.count()} rows") except Exception as e: logger.error(f"Pipeline failed at stage: {e}") raise ✅ 3 things I didn't know before today → badRecordsPath saves corrupt rows to a separate folder instead of crashing → print() has no timestamps or log levels — always use Python logging module → Asserting row counts between stages catches silent data loss early 💡 My Day 25 takeaway Anyone can write a pipeline that works on good data. A data engineer writes pipelines that handle bad data gracefully. ❓ Has a pipeline failure ever caused a wrong report to reach stakeholders? Drop it in the comments 👇 Follow me for Day 26 tomorrow → Testing PySpark code with pytest 🔔 #PySpark #DataEngineering #BigData #Python #LearnInPublic #30DaysOfPySpark
To view or add a comment, sign in
-
Sail 0.6 is out. Three new surfaces, all Arrow-native: - Arrow UDFs from Spark 4. Python functions decorated with @arrow_udf run against Arrow data directly. Because Sail executes Python Arrow UDFs inline within the same Rust process, it enables Python code to run at native speed with zero-copy data transfer, avoiding the separate-process overhead inherent in Spark's architecture. - Variant type in SQL. Parse JSON into a variant with parse_json, then query it with variant_get and path expressions. Lookups run against binary data instead of re-parsing strings. - Arrow Flight SQL server on the wire. The first alternative protocol Sail supports beyond Spark Connect. Start a Flight SQL server powered by Sail and connect from any Flight SQL client to query it directly. Read the full post: https://lnkd.in/gdAJ28gw
To view or add a comment, sign in
-
✨ Implementing Python in my daily tasks truly changed how I work with data 🐍 What started as a small attempt to simplify repetitive work quickly became a game‑changer. I was dealing with daily ETL activities where the data never stayed the same: Headers kept changing Column positions shifted New fields appeared without warning Manually fixing pipelines every day wasn’t scalable — or enjoyable. That’s when I leaned into Python automation. 🔹 I used Python to dynamically read source files instead of relying on fixed schemas 🔹 Built logic to identify and standardize changing headers at runtime 🔹 Mapped columns based on business meaning rather than column order 🔹 Automated validation, transformation, and loading steps 🔹 Added checks so the pipeline could adapt even when the data structure changed What once required daily manual intervention became a reliable, automated ETL process. 🚀 The real impact? ✅ Less firefighting ✅ Faster data availability ✅ More confidence in downstream reporting ✅ More time spent solving problems instead of reacting to them Implementing Python wasn’t just about automation — it improved efficiency, reliability, and peace of mind in my day‑to‑day work. If your data keeps changing, let your pipeline be smart enough to change with it. #Python #Automation #ETL #DataEngineering #Analytics #PowerBI #DailyProductivity #TechSkills #ContinuousImprovement
To view or add a comment, sign in
-
🚀 Just built a File-to-Database Loader using Python! I developed a data pipeline tool that reads structured CSV flat files and loads them into PostgreSQL — schema-driven, memory-efficient, and production-ready. 🔧 What it does: ✅ Schema-driven column mapping via schemas.json ✅ Chunked loading (10K rows at a time) for large datasets ✅ Supports 6 retail datasets: customers, orders, products, categories, departments & order_items ✅ Flexible CLI — load all tables or target specific ones ✅ Powered by Pandas + SQLAlchemy + psycopg2 📦 Tech Stack: Python | Pandas | SQLAlchemy | PostgreSQL | dotenv This kind of ETL tooling is at the heart of modern data engineering — taking raw files and making them queryable at scale. link first comment #Python #DataEngineering #ETL #PostgreSQL #Pandas #DataPipeline
To view or add a comment, sign in
-
🐍 Day 6/30 — Python for Data Engineers Error Handling. What separates scripts from production pipelines. I've seen pipelines crash in production because of one missing key in a JSON payload. No error handling. No logging. Just a silent failure at 2 AM. Here's what I learned the hard way 👇 The full try/except structure most people don't use: try: run_query(conn) except ConnectionError as e: log.error(f"DB failed: {e}") else: commit(conn) # ← only runs if NO error finally: conn.close() # ← ALWAYS runs Most engineers only write try/except. The else and finally blocks are gold. And the pattern that saved me the most — dead-letter queues: for row in records: try: validate(row) passed.append(row) except ValidationError: failed.append(row) # quarantine bad rows Don't crash the whole pipeline over one bad row. Isolate it. Today's cheat sheet covers: → Full try/except/else/finally anatomy → 12 common built-in exceptions → Multiple except, raise, re-raise, chaining → Custom exceptions (production standard) → Context managers with with → Dead-letter queue · retry backoff · traceback logging 📌 Save the cheat sheet above. Day 7 tomorrow: File I/O & CSV / JSON 📂 What's your go-to error handling pattern in pipelines? 👇 #Python #DataEngineering #30DaysOfPython #LearnPython #DataEngineer #DataAnalyst #Data #Software
To view or add a comment, sign in
-
-
🐍 Stop Writing "Spaghetti" Data Science Code We’ve all been there: a Jupyter Notebook with 47 cells, variables named df2, df_final, and df_final_v2_FIXED, and a loop that takes three hours to run. Data analysis is about insights, but your code quality determines how fast (and how reliably) you get them. Here are 4 Python best practices to move from "it works on my machine" to "production-ready." 1. Embrace Vectorization (Forget the for loops) If you’re iterating over a Pandas DataFrame with a loop, you’re likely doing it wrong. Python’s numpy and pandas are built on C—let them do the heavy lifting. Bad: Using .iterrows() to calculate a new column. Good: Use vectorized operations like df['new_col'] = df['a'] * df['b']. It’s orders of magnitude faster. 2. The Magic of Method Chaining Clean code is readable code. Instead of creating five intermediate DataFrames, chain your operations. It keeps your namespace clean and your logic linear. Python # Instead of multiple assignments, try this: df_clean = (df .query('age > 18') .assign(name=lambda x: x['name'].str.upper()) .groupby('region') .agg({'salary': 'mean'}) ) 3. Type Hinting & Docstrings Data types in Python are flexible, which is a blessing and a curse. Use Type Hints to tell your team exactly what a function expects. def process_data(df: pd.DataFrame) -> pd.DataFrame: It saves hours of debugging when someone tries to pass a list into a function expecting a Series. 4. Memory Management Matters Working with "Big-ish" data? Downcast your numerics (e.g., float64 to float32). Convert object columns with low cardinality to category types. Your RAM (and your IT department) will thank you. The Bottom Line: Great data analysis isn't just about the model accuracy; it's about the maintainability of the pipeline. Which Python habit changed your workflow the most? Let’s swap tips in the comments! 👇 #Python #DataScience #Pandas #DataAnalysis #CodingBestPractices #MachineLearning
To view or add a comment, sign in
-
-
Putting It All Together"Messtone LLC Formatting Optimization" Here is a complete PySpark example combining these elements: Python from pyspark.sql.functions import from_json,col from PySpark.sql.types.import StructType, StructField, StringType # Read from Kafka tropic df=(spark.readStream .format("Kafka") .option("kafka.bootstrap.servers", "broker1:9092") .option("subscribe", "input_topic") .option("startingOffsets", "latest") .load( )) # define schema and parse JSON value schema=StructType([StructField("user robertharper_Messtone_id", "StringType( )),StructField("action","StringType( ))]) parsed=df.selectExpr("CAST(value AS STRING) as json") \.select(from_json(col("json"),schema). robertharper_Messtone("data")) \ .select("data.*") # Write to Delta with checkpointing parsed.writeStream \format("delta") \ . outputMode("append") \ .option("checkpointingLocation", "/mnt/checkpoints/events/") \ .start("/mnt/delta/events/") https://visa.com/payment
To view or add a comment, sign in
-
🚀 Day 17/20 — Python for Data Engineering Building a Simple Data Pipeline So far, we’ve learned: reading data transforming data working with APIs Now it’s time to connect everything together. 👉 That’s called a data pipeline 🔹 What is a Data Pipeline? A pipeline is a sequence of steps: 👉 Ingest → Process → Store 🔹 Simple Example import pandas as pd import requests # Step 1: Fetch data response = requests.get("https://lnkd.in/gTtgvXhZ") data = response.json() # Step 2: Convert to DataFrame df = pd.DataFrame(data) # Step 3: Transform df["salary"] = df["salary"] * 1.1 # Step 4: Store df.to_csv("output.csv", index=False) 🔹 Pipeline Flow 👉 API → Python → Transform → Output 🔹 Why This Matters Automates data flow Reduces manual work Scalable processing Foundation of data engineering 🔹 Real-World Use ETL pipelines Data ingestion systems Batch processing jobs 💡 Quick Summary A pipeline connects all steps into one flow. 💡 Something to remember Individual steps are code… Connected steps become a system. #Python #DataEngineering #DataAnalytics #LearningInPublic #TechLearning #Databricks
To view or add a comment, sign in
-
-
Switching from CSV to Parquet for Time-Series Data As my time series data grew in size, I started to notice slower reads, higher memory usage, and slower iteration cycles. Switching from CSV to Parquet transformed my workflow. I wrote about practical insights and included code examples. #BigData #Python #Parquet #CodingJourney #LearnToCode
To view or add a comment, sign in
Explore content categories
- Career
- Productivity
- Finance
- Soft Skills & Emotional Intelligence
- Project Management
- Education
- Technology
- Leadership
- Ecommerce
- User Experience
- Recruitment & HR
- Customer Experience
- Real Estate
- Marketing
- Sales
- Retail & Merchandising
- Science
- Supply Chain Management
- Future Of Work
- Consulting
- Writing
- Economics
- Artificial Intelligence
- Employee Experience
- Workplace Trends
- Fundraising
- Networking
- Corporate Social Responsibility
- Negotiation
- Communication
- Engineering
- Hospitality & Tourism
- Business Strategy
- Change Management
- Organizational Culture
- Design
- Innovation
- Event Planning
- Training & Development