This project is a data pipeline for processing pharmaceutical data and generating a drug mention graph. It is designed to facilitate analysis of drug mentions across various scientific publications and clinical trials.
The project is organized into the following directories and files:
mle_test/
├── data/
│ ├── input/
│ └── output/
├── main.py
├── src/
│ ├── __init__.py
│ ├── ad_hoc/
│ │ ├── __init__.py
│ │ ├── analysis.py
│ │ └── graphs.py
│ ├── models/
│ │ ├── __init__.py
│ │ └── schemas.py
│ ├── pipeline/
│ │ ├── __init__.py
│ │ ├── extractors.py
│ │ └── transformers.py
│ ├── sql_test/
│ │ ├── __init__.py
│ │ ├── create_tables/
│ │ │ ├── product_nomenclature.sql
│ │ │ └── transactions.sql
│ │ ├── dblite.py
│ │ ├── local_db/
│ │ ├── queries/
│ │ │ ├── client_sales_query.sql
│ │ │ └── revenue_query_with_alias.sql
│ │ └── synthetic_data.py
│ └── utils/
│ ├── __init__.py
│ ├── helpers.py
│ └── project_tree.py
└── tests/
├── __init__.py
├── data/
├── local_db/
├── test_graph_comparison.py
├── test_integration.py
└── test_sql.py
This structure organizes the project into logical components, facilitating easy navigation and management of the codebase. Each directory serves a specific purpose, such as storing input data, source code, or test cases. The src directory contains the main application logic, while the tests directory includes unit and integration tests to ensure code quality and functionality.
To run the project in a Dockerized container, follow these steps:
-
Build the Docker image:
make build
-
Run the Docker container (you'll get an interactive shell):
make run
You can run the pipeline with
python main.py
The adhoc treatment can be found in:
python -m src.ad_hoc.analysis
And the SQL queries with
python -m src.sql_test.dblite
-
Execute tests inside the Docker container:
make test
If you are on Windows, you can use:
.\ws_install_venv.ps1 -envName mle_testThis project uses uv for dependency management to avoid conflicts.
To install uv locally, run:
make install_uvOnce installed, activate your virtual environment:
make venv
make activateIf setting up the project for the first time, install dependencies with:
make installTo automatically apply coding best practices, install pre-commit with:
make pre_commitThis module is tested with unittest. To run the tests, use:
make testTODO: Code is commented and ready for automatic documentation generation.
To better understand the code by consulting the module documentation, visit TODO (sphynx).
The pharmaceutical data pipeline includes analysis capabilities for understanding drug-journal relationships. The following analyses are performed:
-
Journals with Most Mentions of a Specific Drug:
- Identifies which journals mention a specific drug the most times.
- Helps in understanding the prominence of a drug in scientific literature.
-
Journal with Most Different Drug Mentions:
- Determines which journal mentions the highest number of different drugs.
- Provides insights into journals that cover a wide range of pharmaceutical topics.
-
Graph Visualization:
- Interactive network visualization showing relationships between drugs and journals.
- Provides a quick and intuitive way to grasp the connections in the database.
- The graph is available in
graph.htmlat the./data/charts.
Note: To view the interactive visualization:
- Clone the repository locally
- Open
graph.htmlin your web browser
Screenshot of the visualization:
This interactive visualization allows users to:
- Zoom in/out to explore different parts of the network
- Hover over nodes to see detailed information
The results of these analyses are logged using the loguru logger, providing clear and concise output for further investigation or reporting.
The pipeline is structured to facilitate easy implementation with Airflow. Key design considerations include:
-
Modular Task Functions:
- Each step in the pipeline is encapsulated in a separate function, representing a distinct task.
- This modularity allows each function to be easily mapped to an Airflow task, promoting reusability and clarity.
-
Clear Task Dependencies:
- The pipeline tasks are executed in a logical sequence, forming a directed acyclic graph (DAG).
- This structure aligns with Airflow's DAG-based execution model, making it straightforward to define task dependencies.
-
Configurable and Flexible:
- The pipeline supports configuration through a dictionary, allowing for easy customization of file paths and parameters.
- This flexibility is crucial for adapting the pipeline to different environments and datasets.
-
Robust Logging and Error Handling:
- The use of
logurufor logging provides detailed execution information, essential for monitoring and debugging in Airflow. - Comprehensive error handling ensures that failures are logged and can be addressed promptly.
- The use of
These design choices ensure that the pipeline is not only effective for processing pharmaceutical data but also ready for integration into a production-grade workflow management system like Airflow.
- Kubernetes Deployment: Implement Kubernetes for orchestrating multi-node processing of TB-scale data.
- Stateful Workloads: Use StatefulSets for maintaining processing state across millions of files.
- Resource Management: Configure resource quotas and limits for predictable performance.
- Use Cloud Storage: Store large datasets in Google Cloud Storage for scalability.
- Optimize Data Formats: Implement Parquet or Avro formats that support compression and efficient querying.
- Data Locality: Configure storage and compute proximity to minimize data transfer costs.
- Distributed Processing: Leverage Apache Beam with Dataflow or Spark with Dataproc on Kubernetes.
- Batch Processing: For large historical datasets with checkpointing for recovery.
- Stream Processing: For real-time data ingestion with proper backpressure handling.
- Pub/Sub for Streaming: Handle high-throughput real-time data.
- Batch Ingestion: Use scheduled jobs for large files with parallel workers.
- Failure Recovery: Implement idempotent processing to handle retries.
- Pipeline Modularity: Scale using Apache Airflow on Kubernetes.
- Parallel Processing: Implement with dynamic resource allocation.
- Data Partitioning: Process data in right-sized chunks to optimize memory usage.
- Automated Validation: Enhance checks for data quality with sampling techniques.
- Error Handling: Implement robust logging and troubleshooting.
- Circuit Breakers: Prevent cascade failures when processing millions of files.
- Horizontal Scaling: Configure Kubernetes node pools optimized for data processing.
- Optimized Queries: Use BigQuery with partitioning and clustering.
- Preemptible Instances: Reduce costs for fault-tolerant batch workloads.
- Adapt code for Kubernetes-native execution with proper resource requests.
- Implement data warehousing with BigQuery for TB-scale analytics.
- Refactor ETL processes for Dataflow with parallelization patterns.
- Orchestrate with Cloud Composer (Airflow) on Kubernetes.
- Set up comprehensive monitoring, and security (access and encryption).
Note: The viability of Kubernetes must be studied, considering the cost and the volume to treat. Cloud Run can be a solution most of the time.
flowchart TB
%% Data Sources
subgraph Sources["Data Sources"]
S1["TB-Scale Files"]
S2["Millions of Files"]
S3["Streaming/Batching Data"]
style Sources fill:#2D3748,color:white
end
%% Storage Layer
subgraph Storage["Storage"]
GCS["Cloud Storage
(disaster recovery)"]
style Storage fill:#4A5568,color:white
style GCS fill:#4A5568,color:white
end
%% Processing Cluster
subgraph K8s["Kubernetes Cluster"]
style K8s fill:#2C5282,color:white,stroke:#63B3ED
subgraph Ingestion["Data Ingestion"]
BatchJob["Batch Jobs"]
PubSub["Pub/Sub"]
style Ingestion fill:#3182CE,color:white
style BatchJob fill:#3182CE,color:white
style PubSub fill:#3182CE,color:white
end
subgraph Processing["Data Processing"]
Spark["Spark"]
Dataflow["Dataflow"]
style Processing fill:#2B6CB0,color:white
style Spark fill:#2B6CB0,color:white
style Dataflow fill:#2B6CB0,color:white
end
subgraph Control["Control Plane"]
Airflow["Airflow/Composer"]
Monitoring["Monitoring & Logging"]
style Control fill:#2C5282,color:white
style Airflow fill:#2C5282,color:white
style Monitoring fill:#2C5282,color:white
end
end
%% Data Warehouse and Data Mart
subgraph DataPlatform["Data Platform"]
style DataPlatform fill:#553C9A,color:white,stroke:#805AD5
subgraph Warehouse["Data Warehouse"]
RawDWH["Raw Layer"]
StagingDWH["Staging Layer"]
CoreDWH["Core Layer"]
style Warehouse fill:#6B46C1,color:white
style RawDWH fill:#6B46C1,color:white
style StagingDWH fill:#6B46C1,color:white
style CoreDWH fill:#6B46C1,color:white
end
subgraph DataMart["Data Marts"]
AnalyticsDM["Analytics Mart"]
ReportingDM["Reporting Mart"]
MLDataDM["ML Data Mart"]
style DataMart fill:#805AD5,color:white
style AnalyticsDM fill:#805AD5,color:white
style ReportingDM fill:#805AD5,color:white
style MLDataDM fill:#805AD5,color:white
end
end
%% Cross-cutting Concerns
subgraph CrossCutting["Platform Services"]
Security["Security & IAM"]
Optimization["Scaling & Cost Control"]
DisasterRecovery["Disaster Recovery"]
style CrossCutting fill:#718096,color:white
style Security fill:#718096,color:white
style Optimization fill:#718096,color:white
style DisasterRecovery fill:#718096,color:white
end
%% Serving Layer
subgraph Serving["Data Serving"]
API["APIs"]
Dashboard["Dashboards"]
Looker["BI Tools"]
style Serving fill:#9F7AEA,color:white
style API fill:#9F7AEA,color:white
style Dashboard fill:#9F7AEA,color:white
style Looker fill:#9F7AEA,color:white
end
%% Data Flow - Main Path
Sources --> Storage
Storage --> Ingestion
Ingestion --> Processing
%% Warehouse Flow
Processing --> RawDWH
RawDWH --> StagingDWH
StagingDWH --> CoreDWH
%% Data Mart Flow
CoreDWH --> AnalyticsDM
CoreDWH --> ReportingDM
CoreDWH --> MLDataDM
%% Serving Flow
DataMart --> Serving
%% Control Flows
Control ---> Ingestion
Control ---> Processing
Control ---> DataPlatform
%% Cross-cutting Concerns
CrossCutting -.-> K8s
CrossCutting -.-> Storage
CrossCutting -.-> DataPlatform
