Skip to content

Ham15-art/device-data-pipeline-cli

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

14 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Device Data Pipeline (Python CLI, Async API Integration)

A production-inspired data pipeline CLI designed to process, validate, and enrich device data using asynchronous API integration. The idea came from wanting to simulate a real industrial data pipeline (the kind you'd see feeding into monitoring or SCADA systems), and to get hands-on with async Python in a practical context.

What it does

Takes a CSV of device data, validates it, categorizes temperature readings, enriches each record via asynchronous API calls, and outputs a clean CSV along with a JSON summary. Load CSV → Validate → Transform → Async API calls → Merge → Export

Why it matters

This type of pipeline is commonly used in industrial environments to prepare device telemetry data for monitoring, analytics, or alerting systems.

Design Approach

The project is structured with separation of concerns in mind:

  • Processing logic is isolated from API communication
  • Validation is handled independently
  • Async operations are centralized in the API layer
  • Structured data models (Pydantic) used for API responses

This makes the pipeline easier to extend, test, and adapt to real-world scenarios.

Project structure

app/
├── main.py         # CLI entry point & argument parsing
├── processor.py    # Core workflow orchestration
├── api_client.py   # Async API communication layer
├── validators.py   # Input validation logic
├── models.py       # Data models (Pydantic-ready)

I kept the modules intentionally separate — validation, processing, and API logic don't bleed into each other, which made testing and extending things much easier.

Tech Stack

  • Python
  • pandas (data processing)
  • asyncio & aiohttp (asynchronous API calls)

Running it

pip install -r requirements.txt

python -m app.main \
  --input data/input/devices.csv \
  --output data/output/result.csv

Outputs: data/output/result.csv — enriched dataset data/output/summary.json — processing stats

Example

Input (raw device data)

Input CSV

Output (processed & enriched)

Output CSV Output JSON

What happens during processing

  • Invalid or missing temperature values are detected
  • Temperature values are categorized (low / normal / high)
  • Each device is enriched via async API calls
  • To avoid API overload and improve performance, asynchronous requests are controlled using a semaphore, enabling safe parallelism.
  • Additional metadata (API status & response time) is added

What I'd add next

Retry/backoff for flaky API calls Structured logging to file .env / YAML config support Docker setup Hook into real industrial protocols (OPC UA, etc.)

Built to simulate real-world data processing workflows and strengthen practical experience in async Python, API integration, and clean software design.

About

Python CLI for device data processing, validation, and async API enrichment (industrial workflow simulation)

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages