Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,9 @@ import_xml_files/
.venv

s3_pdf_issues.json

# Airflow
logs/
airflow.cfg
airflow.db
webserver_config.py
28 changes: 16 additions & 12 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
FROM python:3.12-slim
FROM registry.cern.ch/cern-sis/base-images/apache/airflow:3.1.7-python3.11

WORKDIR /code
ENV PYTHONBUFFERED=0
ENV AIRFLOW__LOGGING__LOGGING_LEVEL=INFO

ENV PATH="/root/.poetry/bin:${PATH}"
ENV PYTHONPATH="/opt/airflow/refactory:${PYTHONPATH}"

USER root
RUN apt-get update \
&& apt-get install -y kstart curl \
&& apt-get install -y kstart \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* \
&& curl -sSL https://install.python-poetry.org/ \
| python
&& rm -rf /var/lib/apt/lists/*

ENV PATH="/root/.local/bin:$PATH"

RUN poetry --version
USER airflow

COPY poetry.lock pyproject.toml ./
COPY digitization ./digitization

RUN poetry install
COPY airflow/requirements.txt ./requirements.txt
RUN pip install --no-cache-dir -r requirements.txt


COPY airflow/dags ./dags


COPY refactory ./refactory
22 changes: 22 additions & 0 deletions airflow/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
FROM registry.cern.ch/cern-sis/base-images/apache/airflow:3.1.7-python3.11

ENV PYTHONUNBUFFERED=0
ENV AIRFLOW__LOGGING__LOGGING_LEVEL=INFO

USER root

RUN apt-get update && \
apt-get install -y --no-install-recommends \
gcc \
libffi-dev \
libssl-dev \
kstart \
curl \
&& apt-get autoremove -yqq --purge \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*

USER airflow

COPY airflow/requirements.txt /requirements.txt
RUN pip install --no-cache-dir -r /requirements.txt
Empty file.
35 changes: 35 additions & 0 deletions airflow/dags/check_files/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import os
from pypdf import PdfReader
from pypdf.errors import PdfReadError


def validate_pdf(file_path: str) -> bool:
"""Checks if a local PDF is structurally valid and readable."""
try:
file_size = os.path.getsize(file_path)
if file_size < 100:
return False

with open(file_path, "rb") as f:
header = f.read(8)
f.seek(-min(1024, file_size), 2)
trailer = f.read()

if not header.startswith(b"%PDF-"):
return False
if b"%%EOF" not in trailer:
return False

reader = PdfReader(file_path)
if len(reader.pages) == 0:
return False

_ = reader.pages[0]

return True

except OSError as e:
raise RuntimeError(f"System error when accessing file {file_path}: {e}") from e

except (PdfReadError, Exception):
return False
108 changes: 108 additions & 0 deletions airflow/dags/check_files/validate_files_integrity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import json
import tempfile
from datetime import datetime
from airflow.sdk import dag, task, Param

from common.storage_connection import S3Provider
from common.utils import parse_inventory
from check_files.validator import resolve_target_boxes, validate_s3_files


@dag(
dag_id="validate_files_integrity",
start_date=datetime(2024, 1, 1),
schedule=None,
catchup=False,
tags=["digitization", "validation"],
params={
"data_source": Param(
"",
type="string",
description="Inventory source (e.g., 1..1000, [1,2], or CERNBOX hash)",
),
"bucket": Param(
"digitization-dev", type="string", description="Target S3 Bucket name"
),
"base_path": Param(
"cern-archives/raw/PDF/",
type="string",
description="Base S3 path for files",
),
"upload_reports": Param(
False,
type="boolean",
description="Toggle to upload validation reports back to storage",
),
},
)
def validation_dag():

@task(task_id="prepare_target_boxes")
def prepare_targets_boxes(**context):
"""Parses inputs and figures out exactly which boxes to check."""
raw_data_source = context["params"]["data_source"]

inventory_input = parse_inventory(raw_data_source)

targets = resolve_target_boxes(inventory_input)

print(f"{len(targets)} boxes for validation.")
return list(targets)

@task(task_id="validate_s3_pdfs")
def perform_validation(target_boxes: list, **context):
"""Iterates through S3 and checks if PDFs are corrupted."""
bucket = context["params"]["bucket"]
base_path = context["params"]["base_path"]

provider = S3Provider(bucket=bucket)

print(f"Starting validation for {len(target_boxes)} boxes in {base_path}")
result_report = validate_s3_files(provider, base_path, set(target_boxes))

return result_report

@task(task_id="upload_validation_reports")
def upload_reports(report: dict, **context):
"""Generates the JSON/TXT reports and uploads them securely to S3."""
upload_toggle = context["params"]["upload_reports"]

if not upload_toggle or "error" in report:
print("Report upload skipped (Toggle is OFF or error occurred).")
return

bucket = context["params"]["bucket"]
base_path = context["params"]["base_path"]
provider = S3Provider(bucket=bucket)

text_log = f"Validation report for boxes {report['metadata']['target_boxes']}\n"
text_log += f"✅ Valid: {report['statistics']['valid_files_count']}\n"
text_log += f"❌ Corrupted: {report['statistics']['corrupted_files_count']}\n\n"

for vf in report["output"]["valid_files"]:
text_log += f"✅ Valid PDF: {vf}\n"
for cf in report["output"]["corrupted_files"]:
text_log += f"❌ Corrupted PDF: {cf}\n"

with (
tempfile.NamedTemporaryFile(mode="w", delete=False) as json_tmp,
tempfile.NamedTemporaryFile(mode="w", delete=False) as txt_tmp,
):
json.dump(report, json_tmp, indent=4)
txt_tmp.write(text_log)
json_tmp_path, txt_tmp_path = json_tmp.name, txt_tmp.name

remote_json_path = f"{base_path.rstrip('/')}/validation_report.json"
remote_log_path = f"{base_path.rstrip('/')}/s3_pdf_issues.log"

provider.upload_file(json_tmp_path, remote_json_path)
provider.upload_file(txt_tmp_path, remote_log_path)

print(f"Uploaded securely to {remote_json_path} and {remote_log_path}")

targets = prepare_targets_boxes()
validation_results = perform_validation(targets)
upload_reports(validation_results)


validation_dag_instance = validation_dag()
115 changes: 115 additions & 0 deletions airflow/dags/check_files/validator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
import logging
import re
import tempfile
from common.storage_connection import StorageProvider, CernboxProvider
from check_files.utils import validate_pdf



logger = logging.getLogger(__name__)


def resolve_target_boxes(data_source: str | list[int]) -> set[int]:
"""Resolves the raw input into a definitive set of Box Numbers."""
target_box_numbers = set()

if (
isinstance(data_source, str)
and not data_source.replace(".", "").isdigit()
and "[" not in data_source
):
data_source_provider = CernboxProvider(public_link_hash=data_source)
excel_files = data_source_provider.list_files("", ".xlsx")

for file_path in excel_files:
filename = file_path.split(".")[0]
match = re.search(r"(?i:BOITE)[\-_]O0(\d+)(-\w+)?", filename)
if match:
target_box_numbers.add(int(match.group(1)))
elif isinstance(data_source, list):
target_box_numbers = set(data_source)

return target_box_numbers


def validate_s3_files(
provider: StorageProvider, base_path: str, target_box_numbers: set[int]
) -> dict:
"""Iterates through S3, validates PDFs, and returns a structured report dict."""
logger.info(f"🔍 Starting S3 scan in path: {base_path}")

folders = provider.list_folders(base_path)
if not folders:
logger.warning("No folders found in this path.")
return {"error": "No folders found in this path."}

found_and_valid_boxes = set()
corrupted_files = []
valid_files = []

for folder in folders:
match = re.search(r"(?i:BOITE)[\-_]O0(\d+)(-\w+)?", folder)
if not match:
continue

box_num = int(match.group(1))
if box_num not in target_box_numbers:
continue

pdf_files = provider.list_files(folder, "PDF")
if not pdf_files:
logger.warning(f"Box {box_num}: EMPTY FOLDER (No PDFs found)")
continue

found_and_valid_boxes.add(box_num)
logger.info(f"Starting validation for Box {box_num} (Total: {len(pdf_files)} PDFs)")
box_corrupted_count = 0

for pdf_path in pdf_files:
with tempfile.NamedTemporaryFile(delete=True) as tmp:
provider.download_to_temp(pdf_path, tmp.name)

if validate_pdf(tmp.name):
valid_files.append(pdf_path)
else:
logger.warning(f"Corrupted file -> {pdf_path.split('/')[-1]}")
corrupted_files.append(pdf_path)
box_corrupted_count += 1

if box_corrupted_count == 0:
logger.info(f"Box {box_num} validated successfully. (0 corrupted files)")
else:
logger.warning(f"Box {box_num} validated with issues. ({box_corrupted_count} corrupted files)")

missing_boxes = target_box_numbers - found_and_valid_boxes

summary_text = (
"\n========================================\n"
"VALIDATION SUMMARY REPORT\n"
"========================================\n"
f"Valid PDFs : {len(valid_files)}\n"
f"Corrupted PDFs : {len(corrupted_files)}\n"
f"Missing Boxes : {len(missing_boxes)}\n"
)

if corrupted_files:
summary_text += "\nList of all corrupted files:\n"
for cf in corrupted_files:
summary_text += f" - {cf}\n"
summary_text += "========================================\n"

logger.info(summary_text)

return {
"metadata": {"base_path": base_path, "target_boxes": list(target_box_numbers)},
"statistics": {
"valid_files_count": len(valid_files),
"corrupted_files_count": len(corrupted_files),
"missing_boxes_count": len(missing_boxes),
},
"output": {
"valid_files": valid_files,
"missing_boxes": list(missing_boxes),
"corrupted_files": corrupted_files,
},
}
Empty file added airflow/dags/common/__init__.py
Empty file.
Loading
Loading