Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion AnaProd/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ class AnaTupleFileListBuilderTask(Task, HTCondorWorkflow, law.LocalWorkflow):
n_cpus = copy_param(HTCondorWorkflow.n_cpus, 1)
bundle_flavours = ["core", "inputFileList"]

def __init__(self, *args, **kwargs):
ana_v = kwargs.get("ana_version") or kwargs.get("anaTuple_version")
if ana_v:
kwargs["version"] = ana_v
super(AnaTupleFileListBuilderTask, self).__init__(*args, **kwargs)

_anaTuple_map_cache = None

@classmethod
Expand All @@ -345,7 +351,16 @@ def _get_anaTuple_map(cls, ref_task):
return cls._anaTuple_map_cache

def workflow_requires(self):
# When the Builder's own outputs (plan + reports on fs_anaTuple at this version) already
# exist (central production case), we do not need to run this task at all.
# Return empty early (before super, which would pull bundles when --bundle is used,
# and before any production logic). This prevents bundle flavours like "inputFileList"
# (whose requires can pull InputFileTask) from being required for an "existent" Builder.
if self.complete():
return {}

reqs = super().workflow_requires()

input_file_task_complete = InputFileTask.WF_complete(self)
if not input_file_task_complete:
reqs["anaTuple"] = AnaTupleFileTask.req(self, branches=())
Expand All @@ -360,6 +375,7 @@ def workflow_requires(self):

reqs["AnaTupleFileTask"] = AnaTupleFileTask.req(
self,
version=self.version,
branches=tuple(branch_set),
max_runtime=AnaTupleFileTask.max_runtime._default,
n_cpus=AnaTupleFileTask.n_cpus._default,
Expand All @@ -381,12 +397,19 @@ def _get_branch_set_for_dataset(self, dataset_name, process_group):

def requires(self):
dataset_name, process_group = self.branch_data
# Prune production sources when our plan already exists on the (central) target.
# Complements the workflow_requires early return; prevents the per-file AnaTupleFileTask
# (and thus their InputFileTask deps via workflow_condition) from appearing in the graph.
if self.complete():
return []
if not InputFileTask.WF_complete(self):
return []
branch_set = self._get_branch_set_for_dataset(dataset_name, process_group)
# leaf gets explicit from resolved version (set via ana in our __init__ or per-task for Builder)
return [
AnaTupleFileTask.req(
self,
version=self.version,
max_runtime=AnaTupleFileTask.max_runtime._default,
branch=prod_br,
branches=(prod_br,),
Expand Down Expand Up @@ -477,6 +500,9 @@ class AnaTupleFileListTask(AnaTupleFileListBuilderTask):
bundle_flavours = []

def __init__(self, *args, **kwargs):
ana_v = kwargs.get("ana_version") or kwargs.get("anaTuple_version")
if ana_v:
kwargs["version"] = ana_v
kwargs["workflow"] = "local"
super(AnaTupleFileListTask, self).__init__(*args, **kwargs)

Expand Down Expand Up @@ -504,14 +530,26 @@ class AnaTupleMergeTask(Task, HTCondorWorkflow, law.LocalWorkflow):
delete_inputs_after_merge = luigi.BoolParameter(default=False)
bundle_flavours = ["core", "inputFileList", "AnaTupleFileList"]

def __init__(self, *args, **kwargs):
ana_v = kwargs.get("ana_version") or kwargs.get("anaTuple_version")
if ana_v:
kwargs["version"] = ana_v
super(AnaTupleMergeTask, self).__init__(*args, **kwargs)

def workflow_requires(self):
# If this merge's output already exists on the target (central for dev-on-existing),
# we don't need to run it or its bundles/production organization.
if self.complete():
return {}

reqs = super().workflow_requires()
merge_organization_complete = AnaTupleFileListTask.req(
self, branches=()
).complete()
if not merge_organization_complete:
reqs["AnaTupleFileListTask"] = AnaTupleFileListTask.req(
self,
version=self.version,
branches=(),
max_runtime=AnaTupleFileListTask.max_runtime._default,
n_cpus=AnaTupleFileListTask.n_cpus._default,
Expand All @@ -534,6 +572,7 @@ def workflow_requires(self):

reqs["AnaTupleFileListTask"] = AnaTupleFileListTask.req(
self,
version=self.version,
branches=tuple(branch_set),
max_runtime=AnaTupleFileListTask.max_runtime._default,
n_cpus=AnaTupleFileListTask.n_cpus._default,
Expand All @@ -552,10 +591,16 @@ def requires(self):
skip_future_tasks,
runs,
) = self.branch_data
# When the merged anaTuple already exists on the target (central for dev-on-existing-anaTuples,
# or after a previous successful merge), do not pull the per-file production (FileTask + InputFileTask).
# This (together with the Builder/List pruning) stops the unwanted InputFileTask deps from
# appearing in HistTupleProducerTask / cache graphs when using --AnaTuple*Task-version on central.
if self.complete():
return {"root": {}, "json": {}}
if not InputFileTask.WF_complete(self):
return {"root": {}, "json": {}}
anaTuple_branch_map = AnaTupleFileTask.req(
self, branch=-1, branches=()
self, version=self.version, branch=-1, branches=()
).create_branch_map()
required_branches = {"root": {}}
if not isinstance(anaTuple_branch_map, dict):
Expand All @@ -581,6 +626,7 @@ def requires(self):
required_branches[dependency_type][anaTuple_dataset_name].append(
AnaTupleFileTask.req(
self,
version=self.version,
max_runtime=AnaTupleFileTask.max_runtime._default,
branch=prod_br,
branches=(prod_br,),
Expand All @@ -604,6 +650,7 @@ def requires(self):
required_branches["json"][builder_dataset_name] = (
AnaTupleFileListBuilderTask.req(
self,
version=self.version,
max_runtime=AnaTupleFileListBuilderTask.max_runtime._default,
branch=builder_branch,
branches=(builder_branch,),
Expand All @@ -620,6 +667,7 @@ def workflow_condition(self):
def create_branch_map(self):
branches = {}
nBranch = 0
# Use req(self) for controlled FileList ds map; version via copy of ana* or per-task on FileList.
ds_branch_map = AnaTupleFileListTask.req(
self, branch=-1, branches=()
).create_branch_map()
Expand Down
Loading
Loading