Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 52 additions & 29 deletions cwms/timeseries/timeseries.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
}
return result_dict
except Exception as e:
logging.error(f"Error processing {ts_id}: {e}")

Check failure on line 86 in cwms/timeseries/timeseries.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use "logging.exception()" instead.

See more on https://sonarcloud.io/project/issues?id=HydrologicEngineeringCenter_cwms-python&issues=AZ5p3iIJcWNNQ9grzX4m&open=AZ5p3iIJcWNNQ9grzX4m&pullRequest=287
return None

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
Expand Down Expand Up @@ -154,21 +154,37 @@
return Data(response, selector=selector)


# Number of attempts for a single chunked timeseries request. CDA occasionally
# returns 500s caused by connection-pool exhaustion that succeed on retry; 500
# is intentionally not in the session-level status_forcelist (see PR #282), so
# we retry here, scoped to the chunked store/fetch paths only.
_CHUNK_ATTEMPTS = 6


def _call_with_retry(fn: Any, *args: Any, attempts: int = _CHUNK_ATTEMPTS) -> Any:
for i in range(attempts):
try:
return fn(*args)
except Exception as e:
if i == attempts - 1:
raise
logging.warning(f"chunk attempt {i + 1}/{attempts} failed: {e}")


def fetch_timeseries_chunks(
chunks: List[Tuple[datetime, datetime]],
params: Dict[str, Any],
selector: str,
endpoint: str,
max_workers: int,
) -> List[Data]:
# Initialize an empty list to store results
results = []
results: List[Data] = []
errors: List[str] = []

# Create a ThreadPoolExecutor to manage multithreading
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit tasks for each chunk to the api
future_to_chunk = {
executor.submit(
_call_with_retry,
get_timeseries_chunk,
selector,
endpoint,
Expand All @@ -179,18 +195,23 @@
for chunk_start, chunk_end in chunks
}

# Process completed threads as they finish
for future in concurrent.futures.as_completed(future_to_chunk):
chunk_start, chunk_end = future_to_chunk[future]
try:
# Retrieve the result of the completed future
result = future.result()
results.append(result)
results.append(future.result())
except Exception as e:
chunk_start, chunk_end = future_to_chunk[future]
# Log or handle any errors that occur during execution
logging.error(
error_msg = (
f"Failed to fetch data from {chunk_start} to {chunk_end}: {e}"
)
logging.error(error_msg)
errors.append(error_msg)

if errors:
raise RuntimeError(
f"{len(errors)} of {len(chunks)} chunk(s) failed to fetch:\n"
+ "\n".join(errors)
)

return results


Expand Down Expand Up @@ -669,29 +690,31 @@

# Store chunks concurrently
responses: List[Dict[str, Any]] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Initialize an empty list to store futures
futures = []
# Submit each chunk as a separate task to the executor
for chunk in chunks:
future = executor.submit(
api.post, # The function to execute
endpoint,
chunk, # The chunk of data to store
params,
)
futures.append(future) # Add the future to the list
errors: List[str] = []

for future in concurrent.futures.as_completed(futures):
with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor:
future_to_chunk = {
executor.submit(_call_with_retry, api.post, endpoint, chunk, params): chunk
for chunk in chunks
}

for future in concurrent.futures.as_completed(future_to_chunk):
chunk = future_to_chunk[future]
try:
responses.append({"success:": future.result()})
responses.append({"success": future.result()})
except Exception as e:
start_time = chunk["values"][0][0]
end_time = chunk["values"][-1][0]
logging.error(
f"Error storing chunk from {start_time} to {end_time}: {e}"
)
responses.append({"error": str(e)})
error_msg = f"Error storing chunk from {start_time} to {end_time}: {e}"
logging.error(error_msg)
errors.append(error_msg)
responses.append({"error": error_msg})

if errors:
raise RuntimeError(
f"{len(errors)} of {len(chunks)} chunk(s) failed to store:\n"
+ "\n".join(errors)
)

return

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name = "cwms-python"
repository = "https://github.com/HydrologicEngineeringCenter/cwms-python"

version = "1.0.7"
version = "1.0.8"

packages = [
{ include = "cwms" },
Expand Down
80 changes: 79 additions & 1 deletion tests/cda/timeseries/timeseries_CDA_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
TEST_TSID_CHUNK_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Nulls"
TEST_TSID_COPY_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Copy-Nulls"
TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test")
TEST_TSID_CHUNK_PARTIAL = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Partial"
# Generate 15-minute interval timestamps
START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc)
END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc)
Expand All @@ -35,6 +36,8 @@
TEST_TSID_COPY,
TEST_TSID_CHUNK_NULLS,
TEST_TSID_COPY_NULLS,
TEST_TSID_CHUNK_PARTIAL,
TEST_TSID_DELETE,
]


Expand Down Expand Up @@ -123,7 +126,10 @@ def setup_data():
)
except Exception as e:
print(f"Failed to delete tsid {ts_id}: {e}")
cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True)
try:
cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True)
except Exception as e:
print(f"Failed to delete location {TEST_LOCATION_ID}: {e}")


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -296,6 +302,78 @@ def test_store_timeseries_chunk_ts():
), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}"


def test_store_timeseries_partial_chunk_fail_real_api():
"""One chunk with a corrupt value is rejected by the real API;
the rest succeed. RuntimeError must surface with exactly 1 failure."""
chunk_size = 2 * 7 * 24 * 4 # two weeks of 15-min data

ts_json = ts.timeseries_df_to_json(
DF_CHUNK_MULTI, TEST_TSID_CHUNK_PARTIAL, "m", TEST_OFFICE
)

# Confirm multiple chunks exist so the multithreaded path is taken
chunks = ts.chunk_timeseries_data(ts_json, chunk_size)
assert (
len(chunks) > 1
), "Test requires multiple chunks — increase DF_CHUNK_MULTI range"

# Corrupt the first value of the second chunk so only that chunk is rejected.
corrupt_index = chunk_size
original = ts_json["values"][corrupt_index]
ts_json["values"][corrupt_index] = [original[0], "not_a_number", original[2]]

with pytest.raises(RuntimeError) as exc_info:
ts.store_timeseries(ts_json, multithread=True, chunk_size=chunk_size)

error_msg = str(exc_info.value)
assert "1 of " in error_msg and "chunk(s) failed to store" in error_msg
assert "Error storing chunk from" in error_msg


def test_get_timeseries_partial_chunk_fail_real_api():
"""One fetch chunk is forced to fail; the others really hit the API.
RuntimeError must surface with the chunk details. Depends on
test_store_timeseries_chunk_ts having populated TEST_TSID_CHUNK_MULTI."""

max_days = 14
sabotage_begin = START_DATE_CHUNK_MULTI + timedelta(days=max_days)

chunks = ts.chunk_timeseries_time_range(
START_DATE_CHUNK_MULTI,
END_DATE_CHUNK_MULTI.replace(tzinfo=timezone.utc),
timedelta(days=max_days),
)
assert (
len(chunks) > 1
), "Test requires multiple chunks — increase chunk count or shrink max_days"
assert any(
start == sabotage_begin for start, _ in chunks
), "sabotage_begin must align with a real chunk boundary"

original_chunk = ts.get_timeseries_chunk

def sabotaged(selector, endpoint, param, begin, end):
if begin == sabotage_begin:
raise RuntimeError("simulated CDA failure")
return original_chunk(selector, endpoint, param, begin, end)

with patch.object(ts, "get_timeseries_chunk", side_effect=sabotaged):
with pytest.raises(RuntimeError) as exc_info:
ts.get_timeseries(
ts_id=TEST_TSID_CHUNK_MULTI,
office_id=TEST_OFFICE,
begin=START_DATE_CHUNK_MULTI,
end=END_DATE_CHUNK_MULTI,
max_days_per_chunk=max_days,
unit="SI",
)

error_msg = str(exc_info.value)
assert "1 of " in error_msg and "chunk(s) failed to fetch" in error_msg
assert "Failed to fetch data from" in error_msg
assert "simulated CDA failure" in error_msg


def test_store_timesereis_chunk_to_with_null_values():
# Define parameters
ts_id = TEST_TSID_CHUNK_NULLS
Expand Down
Loading