Branch HistFromNtupleProducerTask per dataset, not per dataset×variable#260
Branch HistFromNtupleProducerTask per dataset, not per dataset×variable#260kandrosov wants to merge 13 commits into
Conversation
…per dataset×variable
92b173b to
7edf88d
Compare
|
@cms-flaf-bot please test |
|
pipeline#14918124 started |
There was a problem hiding this comment.
Pull request overview
This PR addresses FLAF issue #257 by reducing the LAW branching granularity of HistFromNtupleProducerTask from (dataset × variable) to just dataset, so each dataset’s HistTuple inputs are localized once and then reused to produce outputs for all variables. HistMergerTask is updated accordingly to consume the new per-dataset dict outputs while keeping the merger branching per variable unchanged.
Changes:
- Reworked
HistFromNtupleProducerTaskbranching to be per-dataset and updatedoutput()to return a{var_name: target}mapping. - Updated
HistFromNtupleProducerTask.run()to localize inputs once per dataset and loop over variables to generate per-variable outputs. - Adapted
HistMergerTaskto request per-dataset HFN branches and extractinp[var_name]from dict inputs.
Comments suppressed due to low confidence (4)
Analysis/tasks.py:446
branchesis created from aset, so converting it directly to a tuple can yield non-deterministic ordering across runs. Sincebranchesis a Luigi/law parameter, non-deterministic ordering can lead to unstable task IDs and unnecessary reruns. Consider sorting before converting to a tuple.
branch_set = set()
for br_idx, (dataset_name, prod_br_list) in self.branch_map.items():
branch_set.update(prod_br_list)
branches = tuple(branch_set)
req_dict = {
"HistTupleProducerTask": HistTupleProducerTask.req(
self, branches=branches, customisations=self.customisations
)
}
Analysis/tasks.py:601
new_branchsetis aset, but it is converted to alistwhen passed tobranches. Since parameter ordering can affect task IDs, prefer a deterministically ordered tuple (and keep it consistent with other workflow_requires implementations in this file).
return {
"HistFromNtupleProducerTask": HistFromNtupleProducerTask.req(
self, branches=list(new_branchset)
)
}
Analysis/tasks.py:446
branchesis created from aset, so converting it directly to a tuple can yield non-deterministic ordering across runs. Sincebranchesis a Luigi/law parameter, non-deterministic ordering can lead to unstable task IDs and unnecessary reruns. Consider sorting before converting to a tuple.
branch_set = set()
for br_idx, (dataset_name, prod_br_list) in self.branch_map.items():
branch_set.update(prod_br_list)
branches = tuple(branch_set)
req_dict = {
"HistTupleProducerTask": HistTupleProducerTask.req(
self, branches=branches, customisations=self.customisations
)
}
Analysis/tasks.py:601
new_branchsetis aset, but it is converted to alistwhen passed tobranches. Since parameter ordering can affect task IDs, prefer a deterministically ordered tuple (and keep it consistent with other workflow_requires implementations in this file).
return {
"HistFromNtupleProducerTask": HistFromNtupleProducerTask.req(
self, branches=list(new_branchset)
)
}
|
@cms-flaf-bot please test |
|
pipeline#14918165 started |
|
pipeline#14918124 failed |
|
pipeline#14918165 failed |
|
@cms-flaf-bot please test |
|
pipeline#14918419 started |
|
pipeline#14918419 passed |
|
@cms-flaf-bot please test
|
|
pipeline#14985279 started |
|
pipeline#14985279 failed |
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
Comments suppressed due to low confidence (1)
Analysis/tasks.py:726
branches=list(new_branchset)uses a set, so branch order is nondeterministic. Sincebranchescontributes to the task id, this can lead to unstable task signatures / duplicate submissions across runs.
reqs["HistFromNtupleProducerTask"] = HistFromNtupleProducerTask.req(
self, branches=list(new_branchset)
)
…leanup on exception
|
@cms-flaf-bot please test |
|
pipeline#14990928 started |
|
pipeline#14990928 passed |
…ple-per-dataset # Conflicts: # Analysis/tasks.py
…nching (cms-flaf#257) + queue-based download pipeline; default tasks-per-job (cms-flaf#264)
…er-file branches); stop .req from propagating it (exclude_params_req)
…ches + deterministic per-variable bucketing (adding/removing variables no longer renumbers branches)
…laceholder output so the branch is complete and never submitted to HTCondor
…_simultaneous_downloads, default 4) and total in-flight size (max_total_download_size_gb, default 10) from global config
|
@cms-flaf-bot please test |
|
pipeline#15050082 started |
|
pipeline#15050082 failed |
|
@cms-flaf-bot please test |
|
pipeline#15063269 started |
|
pipeline#15063269 passed |



Fix for #257: Improvement to HistFromNtupleProducerTask nJobs Management
Root cause
HistFromNtupleProducerTask.create_branch_map()created one branch per(dataset, variable)pair —nDatasets × nVariablesbranches total. Each branchlocalised all HistTuple files for its dataset (potentially 100+ files) and then
ran
HistProducerFromNTuple.pyfor a single variable. The next branch for thesame dataset would localise exactly the same files again for the next variable.
With 50 variables and 50+ datasets this multiplied network I/O and queue slots
by the number of variables for no gain.
Solution
Branch
HistFromNtupleProducerTaskper dataset only (nDatasetsbranches).Each branch localises the dataset's HistTuple files once and loops over all
variables inside
run(), producing one output file per variable. The output ofeach branch is now a
dictkeyed by variable name rather than a single file.HistMergerTaskis adapted to match:create_branch_map()reads the new per-dataset HFN branch map and emits oneMerger branch per variable (same as before), recording all HFN branch indices.
requires()requests all HFN dataset branches (each branch covers all vars).run()extractsinp[var_name]from each dict input instead of usinginp.abspathdirectly.HistPlotTaskis unchanged — it still iterates overHistMergerTask's branchmap whose tuple structure
(var_name, br_indices, datasets)is preserved.Files changed
FLAF/Analysis/tasks.pyHistFromNtupleProducerTask.create_branch_map:{n: (dataset, prod_br_list)}instead of{n: (var, prod_br_list, dataset)}HistFromNtupleProducerTask.workflow_requires: updated branch_map unpackingHistFromNtupleProducerTask.requires: updated branch_data unpacking; fixed list-vs-generator bug in old codeHistFromNtupleProducerTask.output: returns{var_name: remote_target}dictHistFromNtupleProducerTask.run: loops over variables; localises inputs onceHistMergerTask.create_branch_map: rewritten to read per-dataset HFN branchesHistMergerTask.workflow_requires: updated branch_map unpackingHistMergerTask.requires: updated to request HFN branches by dataset indexHistMergerTask.run: extractsinp[var_name]from dict inputTesting
Run the full CI pipeline (
HistPlotTask --version CI --period Run3_2022 --workflow local --branches 0 --test 1000) for each analysis. The number ofHistFromNtupleProducerTaskbranches should drop fromnDatasets × nVariablestonDatasets.