Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions data_rentgen/consumer/extractors/batch_extraction_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ def add_job(self, job: JobDTO):
job.location = self.add_location(job.location)
if job.type:
job.type = self.add_job_type(job.type)
else:
self.add_job_type(JobTypeDTO.UNKNOWN)
if job.parent_job:
job.parent_job = self.add_job(job.parent_job)
job.tag_values = {self.add_tag_value(tag_value) for tag_value in job.tag_values}
Expand Down
9 changes: 8 additions & 1 deletion data_rentgen/db/repositories/column_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,15 @@ async def create_bulk(self, items: list[ColumnLineageDTO]):
return

insert_statement = insert(ColumnLineage)
statement = insert_statement.on_conflict_do_nothing(
inserted_row = insert_statement.excluded
statement = insert_statement.on_conflict_do_update(
index_elements=[ColumnLineage.created_at, ColumnLineage.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
},
)

await self._session.execute(
Expand Down
4 changes: 4 additions & 0 deletions data_rentgen/db/repositories/input.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
insert_statement = insert_statement.on_conflict_do_update(
index_elements=[Input.created_at, Input.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
"num_bytes": func.greatest(inserted_row.num_bytes, Input.num_bytes),
"num_rows": func.greatest(inserted_row.num_rows, Input.num_rows),
"num_files": func.greatest(inserted_row.num_files, Input.num_files),
Expand Down
7 changes: 2 additions & 5 deletions data_rentgen/db/repositories/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@
from data_rentgen.db.models import Address, Job, JobLastRun, JobTagValue, Location, TagValue
from data_rentgen.db.repositories.base import Repository
from data_rentgen.db.utils.search import make_tsquery, ts_match, ts_rank
from data_rentgen.dto import JobDTO, PaginationDTO

UNKNOWN_JOB_TYPE = 0

from data_rentgen.dto import JobDTO, JobTypeDTO, PaginationDTO

fetch_bulk_query = select(Job).where(
tuple_(Job.location_id, func.lower(Job.name)).in_(
Expand Down Expand Up @@ -281,7 +278,7 @@ async def _create(self, job: JobDTO) -> Job:
location_id=job.location.id,
parent_job_id=job.parent_job.id if job.parent_job else None,
name=job.name,
type_id=job.type.id if job.type else UNKNOWN_JOB_TYPE,
type_id=(job.type or JobTypeDTO.UNKNOWN).id,
)
self._session.add(result)
await self._session.flush([result])
Expand Down
14 changes: 10 additions & 4 deletions data_rentgen/db/repositories/job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
bindparam,
select,
)
from sqlalchemy.dialects.postgresql import insert

from data_rentgen.db.models import JobType
from data_rentgen.db.repositories.base import Repository
Expand All @@ -25,6 +26,7 @@
)

get_distinct_query = select(JobType.type).distinct(JobType.type).order_by(JobType.type)
insert_query = insert(JobType).on_conflict_do_nothing(index_elements=["type"])


class JobTypeRepository(Repository[JobType]):
Expand Down Expand Up @@ -54,7 +56,11 @@ async def _get(self, job_type_dto: JobTypeDTO) -> JobType | None:
return await self._session.scalar(get_one_query, {"type": job_type_dto.type})

async def _create(self, job_type_dto: JobTypeDTO) -> JobType:
result = JobType(type=job_type_dto.type)
self._session.add(result)
await self._session.flush([result])
return result
await self._session.execute(
insert_query,
{
"type": job_type_dto.type,
"id": job_type_dto.id,
},
)
return await self._get(job_type_dto) # type: ignore[return-value]
3 changes: 3 additions & 0 deletions data_rentgen/db/repositories/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ async def create_or_update_bulk(self, operations: list[OperationDTO]) -> None:
await self._session.execute(
update_statement.values(
{
# in case if run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"run_id": bindparam("run_id"),
"name": func.coalesce(bindparam("name"), Operation.name),
"type": func.coalesce(bindparam("type"), Operation.type),
"status": func.greatest(bindparam("status"), Operation.status),
Expand Down
4 changes: 4 additions & 0 deletions data_rentgen/db/repositories/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
insert_statement = insert_statement.on_conflict_do_update(
index_elements=[Output.created_at, Output.id],
set_={
# in case if job or run was changed - workaround for
# https://github.com/OpenLineage/OpenLineage/issues/3846
"job_id": inserted_row.job_id,
"run_id": inserted_row.run_id,
"type": inserted_row.type.op("|")(Output.type),
"num_bytes": func.greatest(inserted_row.num_bytes, Output.num_bytes),
"num_rows": func.greatest(inserted_row.num_rows, Output.num_rows),
Expand Down
6 changes: 6 additions & 0 deletions data_rentgen/dto/job_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,23 @@
from __future__ import annotations

from dataclasses import dataclass, field
from typing import ClassVar


@dataclass(slots=True)
class JobTypeDTO:
type: str
id: int | None = field(default=None, compare=False)

UNKNOWN: ClassVar[JobTypeDTO]

@property
def unique_key(self) -> tuple:
return (self.type,)

def merge(self, new: JobTypeDTO) -> JobTypeDTO:
self.id = new.id or self.id
return self


JobTypeDTO.UNKNOWN = JobTypeDTO(type="UNKNOWN", id=0)
1 change: 1 addition & 0 deletions docs/changelog/next_release/475.bugfix.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Properly update ``job_id`` / ``run_id`` identifiers if operation bound to job/run was changed.
7 changes: 7 additions & 0 deletions tests/resources/events_spark_unknown_with_parent.jsonl

Large diffs are not rendered by default.

Loading