diff --git a/cwms/timeseries/timeseries.py b/cwms/timeseries/timeseries.py index 563b485..17a3294 100644 --- a/cwms/timeseries/timeseries.py +++ b/cwms/timeseries/timeseries.py @@ -154,6 +154,23 @@ def get_timeseries_chunk( return Data(response, selector=selector) +# Number of attempts for a single chunked timeseries request. CDA occasionally +# returns 500s caused by connection-pool exhaustion that succeed on retry; 500 +# is intentionally not in the session-level status_forcelist (see PR #282), so +# we retry here, scoped to the chunked store/fetch paths only. +_CHUNK_ATTEMPTS = 6 + + +def _call_with_retry(fn: Any, *args: Any, attempts: int = _CHUNK_ATTEMPTS) -> Any: + for i in range(attempts): + try: + return fn(*args) + except Exception as e: + if i == attempts - 1: + raise + logging.warning(f"chunk attempt {i + 1}/{attempts} failed: {e}") + + def fetch_timeseries_chunks( chunks: List[Tuple[datetime, datetime]], params: Dict[str, Any], @@ -161,14 +178,13 @@ def fetch_timeseries_chunks( endpoint: str, max_workers: int, ) -> List[Data]: - # Initialize an empty list to store results - results = [] + results: List[Data] = [] + errors: List[str] = [] - # Create a ThreadPoolExecutor to manage multithreading with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Submit tasks for each chunk to the api future_to_chunk = { executor.submit( + _call_with_retry, get_timeseries_chunk, selector, endpoint, @@ -179,18 +195,23 @@ def fetch_timeseries_chunks( for chunk_start, chunk_end in chunks } - # Process completed threads as they finish for future in concurrent.futures.as_completed(future_to_chunk): + chunk_start, chunk_end = future_to_chunk[future] try: - # Retrieve the result of the completed future - result = future.result() - results.append(result) + results.append(future.result()) except Exception as e: - chunk_start, chunk_end = future_to_chunk[future] - # Log or handle any errors that occur during execution - logging.error( + error_msg = ( f"Failed to fetch data from {chunk_start} to {chunk_end}: {e}" ) + logging.error(error_msg) + errors.append(error_msg) + + if errors: + raise RuntimeError( + f"{len(errors)} of {len(chunks)} chunk(s) failed to fetch:\n" + + "\n".join(errors) + ) + return results @@ -669,29 +690,31 @@ def store_timeseries( # Store chunks concurrently responses: List[Dict[str, Any]] = [] - with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - # Initialize an empty list to store futures - futures = [] - # Submit each chunk as a separate task to the executor - for chunk in chunks: - future = executor.submit( - api.post, # The function to execute - endpoint, - chunk, # The chunk of data to store - params, - ) - futures.append(future) # Add the future to the list + errors: List[str] = [] - for future in concurrent.futures.as_completed(futures): + with concurrent.futures.ThreadPoolExecutor(max_workers=actual_workers) as executor: + future_to_chunk = { + executor.submit(_call_with_retry, api.post, endpoint, chunk, params): chunk + for chunk in chunks + } + + for future in concurrent.futures.as_completed(future_to_chunk): + chunk = future_to_chunk[future] try: - responses.append({"success:": future.result()}) + responses.append({"success": future.result()}) except Exception as e: start_time = chunk["values"][0][0] end_time = chunk["values"][-1][0] - logging.error( - f"Error storing chunk from {start_time} to {end_time}: {e}" - ) - responses.append({"error": str(e)}) + error_msg = f"Error storing chunk from {start_time} to {end_time}: {e}" + logging.error(error_msg) + errors.append(error_msg) + responses.append({"error": error_msg}) + + if errors: + raise RuntimeError( + f"{len(errors)} of {len(chunks)} chunk(s) failed to store:\n" + + "\n".join(errors) + ) return diff --git a/pyproject.toml b/pyproject.toml index 358d35f..45e7ed9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ name = "cwms-python" repository = "https://github.com/HydrologicEngineeringCenter/cwms-python" -version = "1.0.7" +version = "1.0.8" packages = [ { include = "cwms" }, diff --git a/tests/cda/timeseries/timeseries_CDA_test.py b/tests/cda/timeseries/timeseries_CDA_test.py index c0f9cc9..c267e06 100644 --- a/tests/cda/timeseries/timeseries_CDA_test.py +++ b/tests/cda/timeseries/timeseries_CDA_test.py @@ -21,6 +21,7 @@ TEST_TSID_CHUNK_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Nulls" TEST_TSID_COPY_NULLS = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Copy-Nulls" TS_ID_REV_TEST = TEST_TSID_MULTI.replace("Raw-Multi", "Raw-Rev-Test") +TEST_TSID_CHUNK_PARTIAL = f"{TEST_LOCATION_ID}.Stage.Inst.15Minutes.0.Raw-Multi-Partial" # Generate 15-minute interval timestamps START_DATE_CHUNK_MULTI = datetime(2025, 7, 31, 0, 0, tzinfo=timezone.utc) END_DATE_CHUNK_MULTI = datetime(2025, 9, 30, 23, 45, tzinfo=timezone.utc) @@ -35,6 +36,8 @@ TEST_TSID_COPY, TEST_TSID_CHUNK_NULLS, TEST_TSID_COPY_NULLS, + TEST_TSID_CHUNK_PARTIAL, + TEST_TSID_DELETE, ] @@ -123,7 +126,10 @@ def setup_data(): ) except Exception as e: print(f"Failed to delete tsid {ts_id}: {e}") - cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) + try: + cwms.delete_location(TEST_LOCATION_ID, TEST_OFFICE, cascade_delete=True) + except Exception as e: + print(f"Failed to delete location {TEST_LOCATION_ID}: {e}") @pytest.fixture(autouse=True) @@ -296,6 +302,78 @@ def test_store_timeseries_chunk_ts(): ), f"Data frames do not match: original = {DF_CHUNK_MULTI.describe()}, stored = {df.describe()}" +def test_store_timeseries_partial_chunk_fail_real_api(): + """One chunk with a corrupt value is rejected by the real API; + the rest succeed. RuntimeError must surface with exactly 1 failure.""" + chunk_size = 2 * 7 * 24 * 4 # two weeks of 15-min data + + ts_json = ts.timeseries_df_to_json( + DF_CHUNK_MULTI, TEST_TSID_CHUNK_PARTIAL, "m", TEST_OFFICE + ) + + # Confirm multiple chunks exist so the multithreaded path is taken + chunks = ts.chunk_timeseries_data(ts_json, chunk_size) + assert ( + len(chunks) > 1 + ), "Test requires multiple chunks — increase DF_CHUNK_MULTI range" + + # Corrupt the first value of the second chunk so only that chunk is rejected. + corrupt_index = chunk_size + original = ts_json["values"][corrupt_index] + ts_json["values"][corrupt_index] = [original[0], "not_a_number", original[2]] + + with pytest.raises(RuntimeError) as exc_info: + ts.store_timeseries(ts_json, multithread=True, chunk_size=chunk_size) + + error_msg = str(exc_info.value) + assert "1 of " in error_msg and "chunk(s) failed to store" in error_msg + assert "Error storing chunk from" in error_msg + + +def test_get_timeseries_partial_chunk_fail_real_api(): + """One fetch chunk is forced to fail; the others really hit the API. + RuntimeError must surface with the chunk details. Depends on + test_store_timeseries_chunk_ts having populated TEST_TSID_CHUNK_MULTI.""" + + max_days = 14 + sabotage_begin = START_DATE_CHUNK_MULTI + timedelta(days=max_days) + + chunks = ts.chunk_timeseries_time_range( + START_DATE_CHUNK_MULTI, + END_DATE_CHUNK_MULTI.replace(tzinfo=timezone.utc), + timedelta(days=max_days), + ) + assert ( + len(chunks) > 1 + ), "Test requires multiple chunks — increase chunk count or shrink max_days" + assert any( + start == sabotage_begin for start, _ in chunks + ), "sabotage_begin must align with a real chunk boundary" + + original_chunk = ts.get_timeseries_chunk + + def sabotaged(selector, endpoint, param, begin, end): + if begin == sabotage_begin: + raise RuntimeError("simulated CDA failure") + return original_chunk(selector, endpoint, param, begin, end) + + with patch.object(ts, "get_timeseries_chunk", side_effect=sabotaged): + with pytest.raises(RuntimeError) as exc_info: + ts.get_timeseries( + ts_id=TEST_TSID_CHUNK_MULTI, + office_id=TEST_OFFICE, + begin=START_DATE_CHUNK_MULTI, + end=END_DATE_CHUNK_MULTI, + max_days_per_chunk=max_days, + unit="SI", + ) + + error_msg = str(exc_info.value) + assert "1 of " in error_msg and "chunk(s) failed to fetch" in error_msg + assert "Failed to fetch data from" in error_msg + assert "simulated CDA failure" in error_msg + + def test_store_timesereis_chunk_to_with_null_values(): # Define parameters ts_id = TEST_TSID_CHUNK_NULLS