🐍 Day 7/30 — Python for Data Engineers File I/O, CSV & JSON. The bread and butter of every ingestion pipeline. Before you touch pandas or Spark — you need to know how Python handles raw files. Because in real pipelines, you'll deal with: → CSVs dropped by vendors in S3 → JSON payloads from REST APIs → JSONL files in your data lake raw layer → Config files that drive your pipeline logic The #1 mistake I see beginners make: # ❌ Wrong — file never closes if an error occurs f = open("data.csv", "r") data = f.read() # ✅ Right — auto-closes even on exceptions with open("data.csv", "r") as f: data = f.read() And the thing that confused me for weeks: json.load(f) # reads from a FILE object json.loads(s) # parses a STRING json.dump(d, f) # writes to a FILE json.dumps(d) # returns a STRING The "s" = string. Once you know that, it sticks forever. For data lake files, JSONL is king: # One JSON object per line — memory efficient with open("events.jsonl") as f: events = [json.loads(line) for line in f if line.strip()] Today's cheat sheet covers: → open() with context managers → All 6 file modes explained → Key file methods (with memory warnings) → csv.DictReader / DictWriter → Common CSV gotchas (encoding, newline, delimiter) → json.load / loads / dump / dumps → JSONL pattern + CSV → JSON transform 📌 Every section has a plain-English explanation — save it. Day 8 tomorrow: OS & Pathlib — Navigate the Filesystem Like a Pro 📁 Which format do you deal with most in your pipelines — CSV or JSON? 👇 #Python #DataEngineering #30DaysOfPython #LearnPython #DataEngineer #ETL #DataAnalyst #DataAnalysis #Data #PythonDev
Jaswanth Thathireddy’s Post
More Relevant Posts
-
🐍 Day 4/30 — Python for Data Engineers Functions. The building blocks of every pipeline. Every Airflow DAG task, every dbt macro, every ETL step — they're all just functions under the hood. Here's what separates beginner Python from production-grade DE code 👇 3 things I use in every pipeline: 1. Type hints — makes your code self-documenting def extract(table: str) -> list: 2. **kwargs — flexible config without breaking the signature def load_data(table, schema="public", **opts): load_data("orders", limit=1000, dry_run=True) 3. Lambda with sorted() — one of the most used patterns sorted(jobs, key=lambda j: j["priority"]) And if you use Airflow, you already use decorators daily: @task def run_dbt_model(model: str): ... That @task is just a decorator — a function that wraps your function. Today's cheat sheet covers: → Function anatomy with type hints → All 4 parameter types (positional, default, *args, **kwargs) → Lambda syntax + real examples → map(), filter(), reduce() → LEGB scope rule → Decorators → Real ETL pipeline patterns 📌 Full cheat sheet above — save it. Day 5 tomorrow: Conditionals & Loops 🔁 What's your go-to function pattern in pipelines? Drop it below 👇 #Python #DataEngineering #30DaysOfPython #Airflow #LearnPython #DataEngineer
To view or add a comment, sign in
-
-
Sail 0.6 is out. Three new surfaces, all Arrow-native: - Arrow UDFs from Spark 4. Python functions decorated with @arrow_udf run against Arrow data directly. Because Sail executes Python Arrow UDFs inline within the same Rust process, it enables Python code to run at native speed with zero-copy data transfer, avoiding the separate-process overhead inherent in Spark's architecture. - Variant type in SQL. Parse JSON into a variant with parse_json, then query it with variant_get and path expressions. Lookups run against binary data instead of re-parsing strings. - Arrow Flight SQL server on the wire. The first alternative protocol Sail supports beyond Spark Connect. Start a Flight SQL server powered by Sail and connect from any Flight SQL client to query it directly. Read the full post: https://lnkd.in/gdAJ28gw
To view or add a comment, sign in
-
🚀 Just built a File-to-Database Loader using Python! I developed a data pipeline tool that reads structured CSV flat files and loads them into PostgreSQL — schema-driven, memory-efficient, and production-ready. 🔧 What it does: ✅ Schema-driven column mapping via schemas.json ✅ Chunked loading (10K rows at a time) for large datasets ✅ Supports 6 retail datasets: customers, orders, products, categories, departments & order_items ✅ Flexible CLI — load all tables or target specific ones ✅ Powered by Pandas + SQLAlchemy + psycopg2 📦 Tech Stack: Python | Pandas | SQLAlchemy | PostgreSQL | dotenv This kind of ETL tooling is at the heart of modern data engineering — taking raw files and making them queryable at scale. link first comment #Python #DataEngineering #ETL #PostgreSQL #Pandas #DataPipeline
To view or add a comment, sign in
-
Switching from CSV to Parquet for Time-Series Data As my time series data grew in size, I started to notice slower reads, higher memory usage, and slower iteration cycles. Switching from CSV to Parquet transformed my workflow. I wrote about practical insights and included code examples. #BigData #Python #Parquet #CodingJourney #LearnToCode
To view or add a comment, sign in
-
Swipe through the slides first 👉 then read below 👇 🚀 Day 25 of 30 — Learning PySpark from Scratch A pipeline that crashes with no error message is worse than a pipeline that doesn't run at all. 😬 Here's how I write robust PySpark pipelines now 👇 ⚡ The 3-layer defence system Layer 1 → try/except around each stage Layer 2 → Python logging module (not print()) Layer 3 → Data quality assertions between stages 💻 The production pipeline template import logging logging.basicConfig(level=logging.INFO, format="%(asctime)s — %(levelname)s — %(message)s") logger = logging.getLogger(__name__) def run_pipeline(): try: # Stage 1: Read df = spark.read.option("badRecordsPath", "output/bad/").csv("data.csv", header=True) logger.info(f"Read: {df.count()} rows") # Quality check — fail fast on bad data assert df.count() > 100, "Too few rows — data may be missing" # Stage 2: Clean df = df.dropna(subset=["revenue"]) logger.info(f"After clean: {df.count()} rows") except Exception as e: logger.error(f"Pipeline failed at stage: {e}") raise ✅ 3 things I didn't know before today → badRecordsPath saves corrupt rows to a separate folder instead of crashing → print() has no timestamps or log levels — always use Python logging module → Asserting row counts between stages catches silent data loss early 💡 My Day 25 takeaway Anyone can write a pipeline that works on good data. A data engineer writes pipelines that handle bad data gracefully. ❓ Has a pipeline failure ever caused a wrong report to reach stakeholders? Drop it in the comments 👇 Follow me for Day 26 tomorrow → Testing PySpark code with pytest 🔔 #PySpark #DataEngineering #BigData #Python #LearnInPublic #30DaysOfPySpark
To view or add a comment, sign in
-
## 𝗕𝗿𝗶𝗱𝗴𝗶𝗻𝗴 𝘁𝗵𝗲 𝗚𝗮𝗽: 𝗦𝗤𝗟 𝘁𝗼 𝗣𝘆𝘁𝗵𝗼𝗻 𝗳𝗼𝗿 𝗗𝗮𝘁𝗮 𝗣𝗿𝗼𝗳𝗲𝘀𝘀𝗶𝗼𝗻𝗮𝗹𝘀 🐍📊 Navigating the world of data often involves working with both SQL and Python. Understanding how to translate common SQL operations into Python can significantly streamline your data analysis and manipulation workflows. This quickstart guide offers a handy reference for common tasks, from filtering and ordering data to handling missing values and merging datasets. 𝗞𝗲𝘆 𝗧𝗿𝗮𝗻𝘀𝗹𝗮𝘁𝗶𝗼𝗻𝘀: • 𝗙𝗶𝗹𝘁𝗲𝗿𝗶𝗻𝗴: `WHERE column = 'value'` → `df[df['column'] == 'value']` • 𝗢𝗿𝗱𝗲𝗿𝗶𝗻𝗴: `ORDER BY column ASC` → `df.sort_values(by='column', ascending=True)` • 𝗥𝗲𝗺𝗼𝘃𝗶𝗻𝗴 𝗗𝘂𝗽𝗹𝗶𝗰𝗮𝘁𝗲𝘀: `SELECT DISTINCT col1, col2` → `df.drop_duplicates(subset=['col1', 'col2'])` • 𝗙𝗶𝗹𝗹𝗶𝗻𝗴 𝗠𝗶𝘀𝘀𝗶𝗻𝗴 𝗩𝗮𝗹𝘂𝗲𝘀: `COALESCE(col, 'xxx')` → `df['column'].fillna('xxx')` • 𝗖𝗵𝗮𝗻𝗴𝗶𝗻𝗴 𝗗𝗮𝘁𝗮 𝗧𝘆𝗽𝗲𝘀: `CAST(col AS INTEGER)` → `df['column'].astype(int)` • 𝗥𝗲𝗻𝗮𝗺𝗶𝗻𝗴 𝗖𝗼𝗹𝘂𝗺𝗻𝘀: `SELECT col AS new_col` → `df.rename(columns={'col': 'new_col'})` • 𝗔𝗴𝗴𝗿𝗲𝗴𝗮𝘁𝗶𝗼𝗻𝘀: `SUM()`, `AVG()`, `MIN()`, `MAX()`, `COUNT()` → `.sum()`, `.mean()`, `.min()`, `.max()`, `.count()` • 𝗠𝗲𝗿𝗴𝗶𝗻𝗴 𝗗𝗮𝘁𝗮𝘀𝗲𝘁𝘀: `JOIN` → `pd.merge(table1, table2, on='key')` • 𝗔𝗽𝗽𝗲𝗻𝗱𝗶𝗻𝗴 𝗗𝗮𝘁𝗮𝘀𝗲𝘁𝘀: `UNION ALL` → `pd.concat([table1, table2])` Mastering these translations can unlock greater efficiency and flexibility in your data projects. What are your favorite SQL to Python translation tips? Share them in the comments below! 👇 ♻️ Repost if you find it helpful #SQL #Python #DataAnalysis #DataScience #DataEngineering #Programming #Coding #Pandas
To view or add a comment, sign in
-
-
The 6 Pillars of Spark Memory:🧠 If you are only tuning spark executor memory(spark.executor.memory), you are only seeing half the picture. In a PySpark environment, Understanding where your data actually lives is the key 1. RESERVED MEMORY The Spark sets aside a fixed amount (usually 300MB) to ensure the internal engine has enough breathing room for its own objects. You cannot use this for your data. 2. USER MEMORY This is for everything outside of Spark's internal processing. It stores your custom data structures (like Python dictionaries), metadata, and user-defined functions. 3. UNIFIED MEMORY (The Core) This is the most famous part of the JVM heap. It is shared dynamically between: • Storage Memory: For your .cache(), .persist(), and Broadcast variables. • Execution Memory: For the "heavy lifting" like shuffles, joins, and sorts. Note: They can "borrow" from each other, but Execution always has priority. 4. OFF-HEAP MEMORY Managed by Spark but lives outside the JVM. It is used to store data in serialized form, which drastically reduces Garbage Collection (GC) overhead. It’s a powerful lever for high-performance pipelines. 5. MEMORY OVERHEAD This is non-heap memory used for the executor process itself (network buffers, thread stacks, etc.) • The Risk: If you have thousands of partitions or high shuffle concurrency, this overhead can spike. By default, it’s only 10% of your executor memory—often not enough for massive scales! 6. PYSPARK MEMORY If you use Pandas, NumPy, or Vectorized UDFs, that memory is consumed in the Python Worker process, not the JVM. If this isn't allocated correctly, your Python code will starve the JVM and crash the container. #PySpark #DataEngineering #ApacheSpark #BigData #SparkOptimization #CloudComputing
To view or add a comment, sign in
-
-
Putting It All Together"Messtone LLC Formatting Optimization" Here is a complete PySpark example combining these elements: Python from pyspark.sql.functions import from_json,col from PySpark.sql.types.import StructType, StructField, StringType # Read from Kafka tropic df=(spark.readStream .format("Kafka") .option("kafka.bootstrap.servers", "broker1:9092") .option("subscribe", "input_topic") .option("startingOffsets", "latest") .load( )) # define schema and parse JSON value schema=StructType([StructField("user robertharper_Messtone_id", "StringType( )),StructField("action","StringType( ))]) parsed=df.selectExpr("CAST(value AS STRING) as json") \.select(from_json(col("json"),schema). robertharper_Messtone("data")) \ .select("data.*") # Write to Delta with checkpointing parsed.writeStream \format("delta") \ . outputMode("append") \ .option("checkpointingLocation", "/mnt/checkpoints/events/") \ .start("/mnt/delta/events/") https://visa.com/payment
To view or add a comment, sign in
-
🐍 Day 6/30 — Python for Data Engineers Error Handling. What separates scripts from production pipelines. I've seen pipelines crash in production because of one missing key in a JSON payload. No error handling. No logging. Just a silent failure at 2 AM. Here's what I learned the hard way 👇 The full try/except structure most people don't use: try: run_query(conn) except ConnectionError as e: log.error(f"DB failed: {e}") else: commit(conn) # ← only runs if NO error finally: conn.close() # ← ALWAYS runs Most engineers only write try/except. The else and finally blocks are gold. And the pattern that saved me the most — dead-letter queues: for row in records: try: validate(row) passed.append(row) except ValidationError: failed.append(row) # quarantine bad rows Don't crash the whole pipeline over one bad row. Isolate it. Today's cheat sheet covers: → Full try/except/else/finally anatomy → 12 common built-in exceptions → Multiple except, raise, re-raise, chaining → Custom exceptions (production standard) → Context managers with with → Dead-letter queue · retry backoff · traceback logging 📌 Save the cheat sheet above. Day 7 tomorrow: File I/O & CSV / JSON 📂 What's your go-to error handling pattern in pipelines? 👇 #Python #DataEngineering #30DaysOfPython #LearnPython #DataEngineer #DataAnalyst #Data #Software
To view or add a comment, sign in
-
-
What if adding a new DLT pipeline table required zero Python code? Most DLT pipelines follow the same pattern — Python files with hard-coded SQL strings, decorators stacked on every function, and a databricks.yml that needs updating for every new table. It works at small scale. But when you're building a gold layer with 20+ dimensions, each with SCD tracking and data quality expectations, it gets hard to review, hard to test, and easy to break. I built a config-driven framework that reduces this to two files per table: → A .sql file — your pure SELECT query → A .yaml file — table type, primary keys, expectations Drop them in a folder. The pipeline picks them up at runtime. No Python changes. No decorator wiring. No deployment config updates. Under the hood, a factory pattern handles runtime discovery, MV/SCD1/SCD2 registration, expectation wiring, and DBR version compatibility. I'm writing a 4-part series breaking down the full architecture. Part 1 covers the core pattern: https://lnkd.in/g4Zi6eQc Next up — the validation layer with Pydantic and SQLFluff. What's been your biggest headache scaling DLT pipelines? #Databricks #DeltaLiveTables #DataEngineering #DLT #MedallionArchitecture
To view or add a comment, sign in
Explore content categories
- Career
- Productivity
- Finance
- Soft Skills & Emotional Intelligence
- Project Management
- Education
- Technology
- Leadership
- Ecommerce
- User Experience
- Recruitment & HR
- Customer Experience
- Real Estate
- Marketing
- Sales
- Retail & Merchandising
- Science
- Supply Chain Management
- Future Of Work
- Consulting
- Writing
- Economics
- Artificial Intelligence
- Employee Experience
- Workplace Trends
- Fundraising
- Networking
- Corporate Social Responsibility
- Negotiation
- Communication
- Engineering
- Hospitality & Tourism
- Business Strategy
- Change Management
- Organizational Culture
- Design
- Innovation
- Event Planning
- Training & Development