From 454a6670860636f77bcbc468a6ee39e153c6b54b Mon Sep 17 00:00:00 2001 From: Bryson Spilman Date: Fri, 6 Mar 2026 12:33:52 -0800 Subject: [PATCH] CDA-113 Adding csv support for timeseries --- .../main/java/cwms/cda/api/Controllers.java | 3 + .../java/cwms/cda/api/CursorIterator.java | 56 +++ .../cwms/cda/api/TimeSeriesController.java | 160 +++++- .../cwms/cda/data/dao/StreamConsumer.java | 13 + .../java/cwms/cda/data/dao/TimeSeriesDao.java | 26 + .../cwms/cda/data/dao/TimeSeriesDaoImpl.java | 295 ++++++++++- .../cda/data/dto/csv/CsvRequiredColumn.java | 14 + .../cwms/cda/data/dto/csv/CsvUnitHeader.java | 22 + .../cwms/cda/data/dto/csv/CwmsCsvDTO.java | 7 + .../cwms/cda/data/dto/csv/TimeSeriesCsv.java | 76 +++ .../cda/data/dto/csv/TimeSeriesCsvRow.java | 92 ++++ .../java/cwms/cda/formatters/Formats.java | 29 +- .../cwms/cda/formatters/csv/CsvFormatter.java | 9 + .../java/cwms/cda/formatters/csv/CsvRows.java | 14 + .../java/cwms/cda/formatters/csv/CsvV1.java | 74 ++- .../cda/formatters/csv/CwmsCsvProcessor.java | 473 ++++++++++++++++++ .../adapters/TimeSeriesRecordSerializer.java | 20 +- .../cda/api/TimeseriesControllerTestIT.java | 320 +++++++++++- .../cwms/cda/data/dto/TimeSeriesTest.java | 2 + .../data/dto/csv/TestTimeSeriesCsvRow.java | 197 ++++++++ .../test/java/cwms/cda/helpers/DTOMatch.java | 25 + .../cwms/cda/api/lrl/large_timeseries.json | 57 +++ .../data/dto/csv/time-series-row-default.csv | 2 + .../time-series-row-with-metadata-columns.csv | 2 + .../time-series-row-with-metadata-comment.csv | 8 + ...time-series-rows-with-metadata-columns.csv | 3 + ...time-series-rows-with-metadata-comment.csv | 9 + .../cwms/cda/data/dto/time-series-default.csv | 5 + .../dto/time-series-metadata-comments.csv | 8 + ...-series-optionals-no-metadata-comments.csv | 5 + ...eries-optionals-with-metadata-comments.csv | 9 + .../cwms/cda/data/dto/time-series.csv | 3 + docs/source/decisions/csv-format.rst | 59 +++ 33 files changed, 2038 insertions(+), 59 deletions(-) create mode 100644 cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java create mode 100644 cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java create mode 100644 cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java create mode 100644 cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv create mode 100644 cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv create mode 100644 docs/source/decisions/csv-format.rst diff --git a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java index cdf101876f..255de300ce 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java @@ -57,6 +57,7 @@ public final class Controllers { public static final String CURSOR = "cursor"; public static final String PAGE = "page"; public static final String PAGE_SIZE = "page-size"; + public static final String CSV_RESUME_FROM_PAGE = "csv-resume-from-page"; // IF the constant has a number at the end its a deprecated variant @@ -127,6 +128,8 @@ public final class Controllers { public static final String MATCH_NULL_PARENTS = "match-null-parents"; public static final String ENTITY_ID = "entity-id"; public static final String PARENT_ENTITY_ID = "parent-entity-id"; + public static final String INCLUDE_METADATA_AS_CSV_COMMENTS = "include-metadata-as-comments"; + public static final String INCLUDE_OPTIONAL_CSV_COLUMNS = "include-optional-csv-columns"; public static final String CREATE_AS_LRTS = "create-as-lrts"; public static final String STORE_RULE = "store-rule"; diff --git a/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java b/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java new file mode 100644 index 0000000000..d8ae6262d0 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java @@ -0,0 +1,56 @@ +package cwms.cda.api; + +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; +import java.util.function.Function; + +public final class CursorIterator implements Iterator

{ + + private final Function fetcher; + private final Function> itemsExtractor; + private final Function nextCursorExtractor; + + private String cursor; + private P nextPage; + private boolean initialized = false; + + public CursorIterator(String initialCursor, Function fetcher, Function> itemsExtractor, Function nextCursorExtractor) { + this.cursor = initialCursor == null ? "" : initialCursor; + this.fetcher = fetcher; + this.itemsExtractor = itemsExtractor; + this.nextCursorExtractor = nextCursorExtractor; + } + + @Override + public boolean hasNext() { + if (!initialized) { + nextPage = fetcher.apply(cursor); + initialized = true; + } + + if (nextPage == null) return false; + + List items = itemsExtractor.apply(nextPage); + return items != null && !items.isEmpty(); + } + + @Override + public P next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + P current = nextPage; + + String nextCursor = nextCursorExtractor.apply(current); + if (nextCursor == null || nextCursor.isEmpty()) { + nextPage = null; + } else { + cursor = nextCursor; + nextPage = fetcher.apply(cursor); + } + + return current; + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java index 0483a71634..0b5ee18bc6 100644 --- a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java +++ b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java @@ -21,7 +21,11 @@ import cwms.cda.data.dao.VerticalDatum; import cwms.cda.data.dto.TimeSeries; import cwms.cda.formatters.ContentType; +import cwms.cda.formatters.csv.CsvV1; +import cwms.cda.data.dto.csv.TimeSeriesCsv; +import cwms.cda.data.dto.csv.TimeSeriesCsvRow; import cwms.cda.formatters.Formats; +import cwms.cda.formatters.csv.CwmsCsvProcessor; import cwms.cda.helpers.DateUtils; import io.javalin.apibuilder.CrudHandler; import io.javalin.core.util.Header; @@ -39,10 +43,17 @@ import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; import java.sql.Timestamp; +import java.time.Instant; import java.time.ZoneId; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; import org.apache.commons.io.IOUtils; import org.apache.http.client.utils.URIBuilder; @@ -382,12 +393,25 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { + "identifies where in the request you are. This is an opaque" + " value, and can be obtained from the 'next-page' value in " + "the response. Deprecated, use " + PAGE + " instead."), - @OpenApiParam(name = PAGE_SIZE, - type = Integer.class, + @OpenApiParam(name = PAGE_SIZE, type = Integer.class, description = "How many entries per page returned. " - + "Default " + DEFAULT_PAGE_SIZE + ". Use 0 to return an empty values array, " + + "For JSON/XML paging, this controls page size. " + + "For CSV, this controls the internal fetch batch size used while streaming a single response. " + + "CSV clients do not request subsequent pages. " + + "Default " + DEFAULT_PAGE_SIZE +". Use 0 to return an empty values array, " + "or -1 to return the entire window in one response without a next-page cursor. " - + "Values less than -1 are invalid.") + + "Values less than -1 are invalid."), + @OpenApiParam(name = CSV_RESUME_FROM_PAGE, + description = "CSV only. Optional opaque cursor that tells the server where to begin the " + + "streamed CSV response. This is intended for resuming a previously interrupted " + + "CSV download. The response is still a single streamed CSV response, not a paged " + + "CSV response."), + @OpenApiParam(name = INCLUDE_METADATA_AS_CSV_COMMENTS, type = Boolean.class, + description = "When true, include dataset metadata as csv header comments " + + "prepended with # (default is false)."), + @OpenApiParam(name = INCLUDE_OPTIONAL_CSV_COLUMNS, type = Boolean.class, + description = "When true, include optional columns (quality-code, data-entry-date) " + + "in the CSV response (default is false).") }, responses = { @OpenApiResponse(status = STATUS_200, @@ -397,6 +421,7 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) { @OpenApiContent(from = TimeSeries.class, type = Formats.XMLV2), @OpenApiContent(from = TimeSeries.class, type = Formats.XML), @OpenApiContent(from = TimeSeries.class, type = Formats.JSON), + @OpenApiContent(from = TimeSeriesCsv.class, type= Formats.CSV), @OpenApiContent(from = TimeSeries.class, type = ""),}), @OpenApiResponse(status = STATUS_400, description = "Invalid parameter combination"), @OpenApiResponse(status = STATUS_404, description = "The provided combination of " @@ -471,6 +496,18 @@ public void getAll(@NotNull Context ctx) { .withShouldTrim(trim.getOrDefault(true)) .withIncludeEntryDate(includeEntryDate) .build(); + + // CSV: stream a single response; page-size is only internal batch size + if (Formats.CSV.equals(contentType.getType())) { + String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class) + .getOrDefault(""); + boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class) + .getOrDefault(false); + int csvBatchSize = validateCsvBatchSize(pageSize); + streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType); + return; + } + // Execute DAO call with a timeout so we can return a clearer message instead of a generic 500 int apiTimeoutMs = Integer.getInteger("cwms.cda.api.apiTimeoutMs", 45000); CompletableFuture daoFuture = CompletableFuture.supplyAsync( @@ -492,7 +529,7 @@ public void getAll(@NotNull Context ctx) { throw unwrapExecutionException(ex); } - if(datum != null) { //this will be null for non-elevation ts + if (datum != null) { //this will be null for non-elevation ts // user has requested a specific vertical datum VerticalDatum vd = VerticalDatum.valueOf(datum); // the users request ts = TimeSeriesVerticalDatumConverter.convertToVerticalDatum(ts, vd); @@ -521,10 +558,33 @@ public void getAll(@NotNull Context ctx) { } String office = ctx.queryParam(OFFICE); + + // CSV: stream a single response; page-size is only internal batch size + if (Formats.CSV.equals(contentType.getType())) { + TimeSeriesRequestParameters requestParameters = new TimeSeriesRequestParameters.Builder() + .withNames(names) + .withOffice(office) + .withUnits(units) + .withBeginTime(beginZdt) + .withEndTime(endZdt) + .withShouldTrim(trim.getOrDefault(true)) + .withIncludeEntryDate(includeEntryDate) + .build(); + + String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class) + .getOrDefault(""); + boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class) + .getOrDefault(false); + int csvBatchSize = validateCsvBatchSize(pageSize); + streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType); + return; + } + results = dao.getTimeseries(format, names, office, units, datum, beginZdt, endZdt, tz); ctx.status(HttpServletResponse.SC_OK); ctx.result(results); } + addDeprecatedContentTypeWarning(ctx, contentType); requestResultSize.update(results.length()); } catch (NotFoundException e) { @@ -552,6 +612,96 @@ static RuntimeException unwrapExecutionException(java.util.concurrent.ExecutionE return new RuntimeException(cause); } + private void streamCsv(@NotNull Context ctx, int batchSize, String resumeFromPage, boolean includeOptionalColumns, + TimeSeriesDao dao, TimeSeriesRequestParameters requestParameters, ContentType contentType) { + ctx.status(HttpServletResponse.SC_OK); + ctx.contentType(Formats.CSV); + ctx.header(Header.CONTENT_TYPE, Formats.CSV + "; charset=UTF-8"); + + boolean includeMetadataAsComments = ctx.queryParamAsClass(INCLUDE_METADATA_AS_CSV_COMMENTS, Boolean.class) + .getOrDefault(false); + + ctx.header(Header.CACHE_CONTROL, "no-store"); + ctx.header("X-Content-Type-Options", "nosniff"); + ctx.header("X-Stream-Batch-Size", String.valueOf(batchSize)); + + CsvV1 csv = new CsvV1(); + + try { + ServletOutputStream out = ctx.res.getOutputStream(); + Iterator iterator = new CursorIterator<>( + resumeFromPage, + cursor -> dao.getTimeseries(cursor, batchSize, requestParameters), + TimeSeries::getValues, + TimeSeries::getNextPage); + AtomicBoolean firstBatch = new AtomicBoolean(true); + iterator.forEachRemaining(ts -> streamBatches(ctx, includeOptionalColumns, ts, firstBatch, includeMetadataAsComments, csv, out)); + } catch (IOException e) { + throw new DataAccessException("Failed streaming CSV response", e); + } + + addDeprecatedContentTypeWarning(ctx, contentType); + } + + private static void streamBatches(@NotNull Context ctx, boolean includeOptionalColumns, TimeSeries ts, AtomicBoolean firstBatch, boolean includeMetadataAsComments, CsvV1 csv, ServletOutputStream out) { + try { + List values = ts.getValues(); + + List rows = new ArrayList<>(values.size()); + String tsId = ts.getName(); + String officeId = ts.getOfficeId(); + String units = ts.getUnits(); + String versionDateStr = ts.getVersionDate() == null + ? null + : ts.getVersionDate().toInstant().toString(); + + for (TimeSeries.Record r : values) { + Instant dt = r.getDateTime() == null ? null : r.getDateTime().toInstant(); + Instant entryDt = r.getDataEntryDate() == null ? null : r.getDataEntryDate().toInstant(); + + rows.add(new TimeSeriesCsvRow.Builder() + .withDateTime(dt) + .withValue(r.getValue()) + .withQualityCode(r.getQualityCode()) + .withDataEntryDate(entryDt) + .withUnits(units) + .build()); + } + + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withTimeSeriesId(tsId) + .withOfficeId(officeId) + .withVersionDate(versionDateStr) + .withRows(rows) + .build(); + + String chunk; + if (firstBatch.get()) { + chunk = includeMetadataAsComments + ? csv.formatWithMetadata(container, includeOptionalColumns) + : csv.format(container, includeOptionalColumns); + } else { + chunk = csv.format(container, includeOptionalColumns); + chunk = CwmsCsvProcessor.stripHeader(chunk); + } + + out.write(chunk.getBytes(StandardCharsets.UTF_8)); + out.flush(); + ctx.res.flushBuffer(); + + firstBatch.set(false); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private int validateCsvBatchSize(int requestedPageSize) { + if (requestedPageSize <= 0) { + throw new IllegalArgumentException("For CSV streaming, page-size must be greater than 0."); + } + return requestedPageSize; + } + private void addLinkHeader(@NotNull Context ctx, TimeSeries ts, ContentType contentType) { try { // Send back the link to the next page in the response header diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java index 8e337ab413..6c59659ee0 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java @@ -4,7 +4,20 @@ import java.io.InputStream; import java.sql.SQLException; +/** + * A consumer for streaming binary data out to callers (e.g., HTTP layer). + * + * The primary abstract method accepts an InputStream with optional position and total length. + * For generated content where the total length is not known up front (e.g., CSV rendered on-demand), + * use the default two-argument overload which delegates with null position/length values, allowing + * callers to choose chunked transfer without setting Content-Length. + */ @FunctionalInterface public interface StreamConsumer { + void accept(InputStream stream, long inputStreamPosition, String mediaType, long totalLength) throws SQLException, IOException; + + default void accept(InputStream stream, String mediaType) throws SQLException, IOException { + accept(stream, 0L, mediaType, -1L); + } } \ No newline at end of file diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java index 424ebbc91e..9b5349d2a3 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java @@ -59,5 +59,31 @@ List findRecentsInRange(String office, String categoryId, String gr List findMostRecentsInRange(String office, List tsIds, Timestamp pastLimit, Timestamp futureLimit, UnitSystem unitSystem); + /** + * Streams the requested TimeSeries data in CSV format using a {@link StreamConsumer}, + * similar to the BlobDao streaming pattern. The DAO will invoke the provided consumer + * with an InputStream that produces the CSV bytes on-the-fly, avoiding buffering the + * entire dataset in memory. + * + * Notes: + * - The consumer is responsible for copying the InputStream to the HTTP response and + * setting any additional headers as needed. + * - The mediaType passed to the consumer will be {@code Formats.CSV}. + * - The totalLength may be unknown and can be negative; callers should not rely on it. + * + * @param requestParameters request parameters describing the time-series retrieval + * @param pageSize page size to use when paging results internally; non-positive values disable limiting + * @param consumer a sink that will receive a streaming InputStream of CSV data + */ + /** + * Stream time-series in CSV using CsvV1 formatter. + * + * @param requestParameters parameters describing the request + * @param consumer sink for the InputStream + * @param includeMetadataAsComments when true, include metadata as header comments + */ + void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters, + StreamConsumer consumer, + boolean includeMetadataAsComments); } diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java index d7ba536372..aa497cbfb0 100644 --- a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java +++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java @@ -1,6 +1,17 @@ package cwms.cda.data.dao; import static com.google.common.flogger.LazyArgs.lazy; +import cwms.cda.data.dao.rsql.FieldResolver; +import cwms.cda.data.dao.rsql.MapFieldResolver; +import cwms.cda.data.dao.rsql.RSQLConditionBuilder; +import cwms.cda.data.dto.filteredtimeseries.FilteredTimeSeries; +import cwms.cda.data.dto.catalog.TimeSeriesAlias; +import cwms.cda.formatters.csv.CwmsCsvProcessor; +import cwms.cda.helpers.DateUtils; + +import java.io.IOException; +import java.sql.Connection; + import static org.jooq.impl.DSL.asterisk; import static org.jooq.impl.DSL.countDistinct; import static org.jooq.impl.DSL.field; @@ -25,9 +36,6 @@ import com.google.common.flogger.FluentLogger; import cwms.cda.api.enums.UnitSystem; import cwms.cda.api.enums.VersionType; -import cwms.cda.data.dao.rsql.FieldResolver; -import cwms.cda.data.dao.rsql.MapFieldResolver; -import cwms.cda.data.dao.rsql.RSQLConditionBuilder; import cwms.cda.data.dto.Catalog; import cwms.cda.data.dto.CwmsDTOPaginated; import cwms.cda.data.dto.RecentValue; @@ -38,15 +46,11 @@ import cwms.cda.data.dto.TsvId; import cwms.cda.data.dto.VerticalDatumInfo; import cwms.cda.data.dto.catalog.CatalogEntry; -import cwms.cda.data.dto.catalog.TimeSeriesAlias; import cwms.cda.data.dto.catalog.TimeseriesCatalogEntry; -import cwms.cda.data.dto.filteredtimeseries.FilteredTimeSeries; import cwms.cda.formatters.xml.XMLv1; -import cwms.cda.helpers.DateUtils; import cwms.cda.helpers.ZoneIdHelper; import java.math.BigDecimal; import java.math.BigInteger; -import java.sql.Connection; import java.sql.SQLException; import java.sql.Timestamp; import java.time.Duration; @@ -99,6 +103,12 @@ import usace.cwms.db.jooq.codegen.udt.records.ZTSV_ARRAY; import usace.cwms.db.jooq.codegen.udt.records.ZTSV_TYPE; +import java.nio.charset.StandardCharsets; +import cwms.cda.formatters.csv.CsvV1; +import cwms.cda.formatters.Formats; +import cwms.cda.data.dto.csv.TimeSeriesCsv; +import cwms.cda.data.dto.csv.TimeSeriesCsvRow; + public class TimeSeriesDaoImpl extends JooqDao implements TimeSeriesDao { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -220,6 +230,277 @@ public String getTimeseries(String format, String names, String office, String u timezone.getId(), office); } + @Override + public void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters, + StreamConsumer consumer, boolean includeMetadataAsComments) { + // Delegate to the overload that supports tuning knobs with defaults + streamRequestedTimeSeriesCsv(requestParameters, consumer, + includeMetadataAsComments, null, null); + } + + // Overload allowing the caller to provide JDBC/jOOQ fetch size and rows-per-buffer for CSV rendering + public void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters, + StreamConsumer consumer, + boolean includeMetadataAsComments, + Integer dbFetchSize, + Integer rowsPerBuffer) { + // This method mirrors the data selection flow of getRequestedTimeSeries, but delivers + // a streaming CSV InputStream via StreamConsumer (BlobDao pattern) to avoid buffering. + String names = requestParameters.getNames(); + String office = requestParameters.getOffice(); + String units = requestParameters.getUnits(); + ZonedDateTime beginTime = requestParameters.getBeginTime(); + ZonedDateTime endTime = requestParameters.getEndTime(); + + // Resolve office, timeseries id and code + final Field officeId = CWMS_UTIL_PACKAGE.call_GET_DB_OFFICE_ID( + office != null ? DSL.val(office) : CWMS_UTIL_PACKAGE.call_USER_OFFICE_ID()); + final Field tsId = CWMS_TS_PACKAGE.call_GET_TS_ID__2(DSL.val(names), officeId); + final Field tsCode = CWMS_TS_PACKAGE.call_GET_TS_CODE__2(DSL.val(names), officeId); + + // Determine actual unit (handle EN/SI to default units mapping) + Field unit = units.compareToIgnoreCase("SI") == 0 || units.compareToIgnoreCase("EN") == 0 + ? CWMS_UTIL_PACKAGE.call_GET_DEFAULT_UNITS(CWMS_TS_PACKAGE.call_GET_BASE_PARAMETER_ID(tsCode), DSL.val(units, String.class)) + : DSL.val(units, String.class); + + // Give TVQ (time,value,quality) column names + Field dateTimeCol = field(DATE_TIME, Timestamp.class).as(DATE_TIME); + Field valueCol = field(VALUE, Double.class).as(VALUE); + Field qualityCol = field(QUALITY_CODE, Integer.class).as(QUALITY_CODE); + + Long beginTimeMilli = beginTime.toInstant().toEpochMilli(); + Long endTimeMilli = endTime.toInstant().toEpochMilli(); + String trim = formatBool(requestParameters.isShouldTrim()); + final String startInclusive = "T"; + final String endInclusive = "T"; + String previous = "F"; + String next = "F"; + Long versionDateMilli = requestParameters.getVersionDate() != null ? requestParameters.getVersionDate().toInstant().toEpochMilli() : null; + String maxVersion = requestParameters.getVersionDate() == null ? "T" : "F"; + + // Build the table(...) call to retrieve rows (mirror existing implementation style) + String retrievalMethod = "cwms_20.cwms_ts.retrieve_ts_out_tab"; // CSV excludes entry-date + SQL retrieveSelectData = DSL.sql( + "table(" + retrievalMethod + "(?,?," + + "cwms_20.cwms_util.to_timestamp(?),cwms_20.cwms_util.to_timestamp(?), 'UTC'," + + "?,?,?,?,?," + getVersionPart(requestParameters.getVersionDate()) + ",?,?) ) retrieveTs", + tsId, + unit, + beginTimeMilli, + endTimeMilli, + trim, startInclusive, endInclusive, previous, next, + versionDateMilli, maxVersion, + officeId + ); + + // Now select the needed columns (date_time, value, quality) and restrict the time window + SelectConditionStep> query = dsl.select(dateTimeCol, valueCol, qualityCol) + .from(retrieveSelectData) + .where(dateTimeCol.ge(CWMS_UTIL_PACKAGE.call_TO_TIMESTAMP__2(DSL.val(beginTimeMilli)))) + .and(dateTimeCol.le(CWMS_UTIL_PACKAGE.call_TO_TIMESTAMP__2(DSL.val(endTimeMilli)))); + + // No page size limiting when streaming + + logger.atFine().log("%s", lazy(() -> query.getSQL(ParamType.INLINED))); + + // Apply optional JDBC fetch size hint + if (dbFetchSize != null && dbFetchSize > 0) { + try { + query.fetchSize(dbFetchSize); + } catch (Exception ex) { + // Some drivers may ignore/throw; log and continue + logger.atFine().withCause(ex).log("Fetch size hint ignored: %s", dbFetchSize); + } + } + + // Create a lazy CSV InputStream that renders rows on-demand from the database cursor + try (Cursor> recCursor = query.fetchLazy()) { + String tsIdStr = dsl.fetchValue(tsId); + String officeResolved = dsl.fetchValue(officeId); + String resolvedUnits = dsl.fetchValue(unit); + Timestamp versionTs = requestParameters.getVersionDate() != null ? Timestamp.from(requestParameters.getVersionDate().toInstant()) : null; + + CsvV1 csvFormatter = new CsvV1(); + + CsvOnDemandInputStream stream = new CsvOnDemandInputStream( + recCursor, + csvFormatter, + tsIdStr, + officeResolved, + resolvedUnits, + versionTs, + includeMetadataAsComments, + rowsPerBuffer + ); + + try (stream) { + // Unknown total length + consumer.accept(stream, Formats.CSV); + } + } catch (Exception e) { + throw new DataAccessException("Error generating CSV for TimeSeries", e); + } + } + + // InputStream that renders CSV rows on-demand from a jOOQ Cursor + private static final class CsvOnDemandInputStream extends java.io.InputStream { + private final Cursor> cursor; + private final java.util.Iterator> it; + private final CsvV1 csv; + private final String tsIdStr; + private final String officeId; + private final String units; + private final Timestamp versionTs; + private final boolean includeMetaComments; + private final int rowsPerBuffer; + + private byte[] buffer = new byte[0]; + private int bufPos = 0; + private boolean first = true; + private boolean closed = false; + + CsvOnDemandInputStream(Cursor> cursor, + CsvV1 csv, + String tsIdStr, + String officeId, + String units, + Timestamp versionTs, + boolean includeMetaComments, + Integer rowsPerBuffer) { + this.cursor = cursor; + this.it = cursor.iterator(); + this.csv = csv; + this.tsIdStr = tsIdStr; + this.officeId = officeId; + this.units = units; + this.versionTs = versionTs; + this.includeMetaComments = includeMetaComments; + int rpb = rowsPerBuffer == null ? 1 : rowsPerBuffer; + this.rowsPerBuffer = rpb > 0 ? rpb : 1; + } + + @Override + public int read() throws IOException { + byte[] one = new byte[1]; + int r = read(one, 0, 1); + return r == -1 ? -1 : (one[0] & 0xFF); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + if (closed) { + throw new IOException("Stream closed"); + } + if (b == null) { + throw new NullPointerException("b"); + } + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } + + int totalCopied = 0; + while (len > 0) { + if (bufPos >= buffer.length) { + if (!fillBuffer()) { + break; // EOF + } + } + int toCopy = Math.min(len, buffer.length - bufPos); + System.arraycopy(buffer, bufPos, b, off, toCopy); + bufPos += toCopy; + off += toCopy; + len -= toCopy; + totalCopied += toCopy; + } + return totalCopied == 0 ? -1 : totalCopied; + } + + private boolean fillBuffer() { + if (!it.hasNext()) { + buffer = new byte[0]; + bufPos = 0; + return false; + } + + StringBuilder sb = new StringBuilder(8192); + int produced = 0; + while (it.hasNext() && produced < rowsPerBuffer) { + org.jooq.Record3 r = it.next(); + Timestamp ts = r.value1(); + Double val = r.value2(); + Integer qualityCode = r.value3(); + + TimeSeriesCsvRow row = new TimeSeriesCsvRow.Builder() + .withDateTime(ts == null ? null : ts.toInstant()) + .withValue(val) + .withQualityCode(qualityCode) + .withDataEntryDate(null) // entry date not available in this retrieval method + .withUnits(units) + .build(); + + String rendered; + if (first) { + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withTimeSeriesId(tsIdStr) + .withOfficeId(officeId) + .withVersionDate(versionTs == null ? null : versionTs.toInstant().toString()) + .withRows(Collections.singletonList(row)) + .build(); + rendered = csv.format(container); + if (!includeMetaComments) { + rendered = CwmsCsvProcessor.stripHeader(rendered); + } + first = false; + } else { + String csvLine = csv.format(Collections.singletonList(row)); + rendered = extractLastCsvLine(csvLine); + if (!rendered.endsWith("\n")) { + rendered = rendered + "\n"; + } + } + sb.append(rendered); + produced++; + } + + buffer = sb.toString().getBytes(StandardCharsets.UTF_8); + bufPos = 0; + return buffer.length > 0; + } + + @Override + public void close() throws IOException { + if (!closed) { + closed = true; + try { + cursor.close(); + } catch (Exception ex) { + // ignore, we're closing + } + } + } + } + + private static String extractLastCsvLine(String s) { + if (s == null || s.isEmpty()) { + return ""; + } + int i = s.length() - 1; + // Skip trailing newline characters + while (i >= 0 && (s.charAt(i) == '\n' || s.charAt(i) == '\r')) { + i--; + } + if (i < 0) { + return ""; + } + int start = s.lastIndexOf('\n', i); + String line = s.substring(start + 1, i + 1); + // Strip a trailing CR if present + if (!line.isEmpty() && line.charAt(line.length() - 1) == '\r') { + line = line.substring(0, line.length() - 1); + } + return line; + } + /** * Retrieves a TimeSeries from the database * @param page an opaque token used for paging diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java new file mode 100644 index 0000000000..d5a3189b54 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java @@ -0,0 +1,14 @@ +package cwms.cda.data.dto.csv; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Indicates that a CSV column is required, i.e. not optional + */ +@Target({ElementType.FIELD, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface CsvRequiredColumn { +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java new file mode 100644 index 0000000000..7da7b8e89c --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java @@ -0,0 +1,22 @@ +package cwms.cda.data.dto.csv; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Annotation to indicate that a field should have units included in its CSV header. + * For example, if a field "value" is annotated with @CsvUnitHeader, and the DTO + * has a "units" field with value "ft", the CSV header for this column will be + * "value (ft)". + */ +@Target({ElementType.FIELD, ElementType.METHOD}) +@Retention(RetentionPolicy.RUNTIME) +public @interface CsvUnitHeader { + /** + * The name(s) of the field(s) that should have units included in their CSV header. + * The units will be retrieved from the field/method annotated with this annotation. + */ + String field(); +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java new file mode 100644 index 0000000000..53b07f5e05 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java @@ -0,0 +1,7 @@ +package cwms.cda.data.dto.csv; + +import java.util.List; + +public interface CwmsCsvDTO { + List getRows(); +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java new file mode 100644 index 0000000000..ac52706145 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java @@ -0,0 +1,76 @@ +package cwms.cda.data.dto.csv; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import cwms.cda.data.dto.CwmsDTOBase; +import cwms.cda.formatters.Formats; +import cwms.cda.formatters.annotations.FormattableWith; +import cwms.cda.formatters.csv.CsvRows; +import cwms.cda.formatters.csv.CsvV1; + +import java.util.ArrayList; +import java.util.List; + +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) +@JsonDeserialize(builder = TimeSeriesCsv.Builder.class) +@FormattableWith(contentType = Formats.CSV, formatter = CsvV1.class, aliases = {Formats.DEFAULT}) +public final class TimeSeriesCsv extends CwmsDTOBase implements CwmsCsvDTO { + + private final String timeSeriesId; + private final String officeId; + private final String versionDate; + + @CsvRows + private final List rows; + + private TimeSeriesCsv(Builder builder) { + this.timeSeriesId = builder.timeSeriesId; + this.officeId = builder.officeId; + this.versionDate = builder.versionDate; + this.rows = builder.rows; + } + + public String getTimeSeriesId() { return timeSeriesId; } + public String getOfficeId() { return officeId; } + public String getVersionDate() { return versionDate; } + + @Override + public List getRows() { + return rows; + } + + @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) + public static final class Builder { + private String timeSeriesId; + private String officeId; + private String versionDate; + private List rows; + + public Builder withTimeSeriesId(String timeSeriesId) { + this.timeSeriesId = timeSeriesId; + return this; + } + + public Builder withOfficeId(String officeId) { + this.officeId = officeId; + return this; + } + + public Builder withVersionDate(String versionDate) { + this.versionDate = versionDate; + return this; + } + + public Builder withRows(List rows) { + this.rows = rows; + return this; + } + + public TimeSeriesCsv build() { + return new TimeSeriesCsv(this); + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java new file mode 100644 index 0000000000..f700d2fbb5 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java @@ -0,0 +1,92 @@ +package cwms.cda.data.dto.csv; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import cwms.cda.data.dto.CwmsDTOBase; + +import java.time.Instant; + +/** + * Single DTO for TimeSeries CSV rows. All potential columns exist on this class; + * only date-time and value are considered required and are annotated with @CsvRow + * so that when using the CsvV1 mapper they are the only columns included. All other + * fields are annotated with @CsvMetadata and will only be serialized when using a + * CsvMapper that does not apply CsvRow filtering (e.g., for metadata-as-columns=true). + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) +@JsonDeserialize(builder = TimeSeriesCsvRow.Builder.class) +public final class TimeSeriesCsvRow extends CwmsDTOBase { + + @CsvRequiredColumn + @JsonProperty(index = 0) + private final Instant dateTime; + + @CsvRequiredColumn + @JsonProperty(index = 1) + private final Double value; + + @JsonProperty(index = 2) + private final Instant dataEntryDate; + + @JsonProperty(index = 3) + private final Integer qualityCode; + + @CsvUnitHeader(field = "value") + private final String units; + + private TimeSeriesCsvRow(Builder builder) { + this.dateTime = builder.dateTime; + this.value = builder.value; + this.qualityCode = builder.qualityCode; + this.dataEntryDate = builder.dataEntryDate; + this.units = builder.units; + } + + public Instant getDateTime() { return dateTime; } + public Double getValue() { return value; } + public Integer getQualityCode() { return qualityCode; } + public Instant getDataEntryDate() { return dataEntryDate; } + public String getUnits() { return units; } + + @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class) + public static final class Builder { + private Instant dateTime; + private Double value; + private Instant dataEntryDate; + private Integer qualityCode; + private String units; + + public Builder withDateTime(Instant dateTime) { + this.dateTime = dateTime; + return this; + } + + public Builder withValue(Double value) { + this.value = value; + return this; + } + + public Builder withDataEntryDate(Instant dataEntryDate) { + this.dataEntryDate = dataEntryDate; + return this; + } + + public Builder withQualityCode(Integer qualityCode) { + this.qualityCode = qualityCode; + return this; + } + + public Builder withUnits(String units) { + this.units = units; + return this; + } + + public TimeSeriesCsvRow build() { + return new TimeSeriesCsvRow(this); + } + } +} diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java b/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java index 2ab7d01325..7cc0d13378 100644 --- a/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java @@ -247,40 +247,33 @@ public static List parseContentList(ContentType type, } /** - * Parses the supplied header param or queryParam to determine the content type. - * If both are supplied an exception is thrown. If neither are supplied an exception is thrown. + * Parses the supplied header param and/or queryParam to determine the content type. + * Query parameter takes priority over the header and is parsed the same way as the header + * (i.e., supports full content types, versions, and DTO-specific aliases). If neither is + * supplied an exception is thrown. * * @param header Accept header value * @param queryParam format query parameter value * @param klass DTO object class, used for identifying content type aliases from the DTO's * FormattableWith annotations. * @return an appropriate standard mimetype for lookup - * @throws FormattingException if the header and queryParam are both supplied or neither are + * @throws FormattingException if neither header nor queryParam can be parsed into a supported content type */ public static ContentType parseHeaderAndQueryParm(String header, String queryParam, Class klass) { + // If a query parameter is provided, it overrides the header. if (queryParam != null && !queryParam.isEmpty()) { - if (header != null && !header.isEmpty() && !DEFAULT.equals(header.trim())) { - // If the user supplies an accept header and also a format= parameter, which - // should we use? - // The older format= query parameters don't give us the option to supply a - // version the - // way that the accept header does. - throw new UnsupportedFormatException("Accept header and query parameter are both " - + "present, this is not supported."); - } - ContentType ct = parseQueryParam(queryParam, klass); if (ct != null) { return ct; - } else { - throw new UnsupportedFormatException("content-type " + queryParam + " is not implemented"); } - } else if (header == null) { + } + + // No query parameter provided; use the header (parseHeader handles null/empty by mapping to */*) + if (header == null) { throw new UnsupportedFormatException("no content type or format specified"); - } else { - return parseHeader(header, klass); } + return parseHeader(header, klass); } /** diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java new file mode 100644 index 0000000000..29e4b3015d --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java @@ -0,0 +1,9 @@ +package cwms.cda.formatters.csv; + +import cwms.cda.data.dto.csv.CwmsCsvDTO; +import cwms.cda.formatters.OutputFormatter; + +public interface CsvFormatter extends OutputFormatter { + String formatWithMetadata(CwmsCsvDTO dto, boolean includeOptionalColumns); + String format(CwmsCsvDTO dto, boolean includeOptionalColumns); +} diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java new file mode 100644 index 0000000000..de89fe81c9 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java @@ -0,0 +1,14 @@ +package cwms.cda.formatters.csv; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +/** + * Marks a field as containing the data rows for a CSV DTO. + */ +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.FIELD, ElementType.METHOD}) +public @interface CsvRows { +} diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java index 5f270fa851..e52d4fa295 100644 --- a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java @@ -1,56 +1,87 @@ package cwms.cda.formatters.csv; -import java.io.InputStream; -import java.util.List; - +import com.fasterxml.jackson.core.JsonProcessingException; import cwms.cda.data.dto.CwmsDTOBase; import cwms.cda.data.dto.LocationGroup; import cwms.cda.data.dto.Office; +import cwms.cda.data.dto.csv.CwmsCsvDTO; import cwms.cda.formatters.Formats; -import cwms.cda.formatters.OutputFormatter; +import cwms.cda.formatters.FormattingException; +import java.io.InputStream; +import java.util.List; -public class CsvV1 implements OutputFormatter { +public class CsvV1 implements CsvFormatter { @Override public String getContentType() { return Formats.CSV; } + /** + * Default formatting does not include metadata in either columns or comments. + **/ @Override public String format(CwmsDTOBase dto) { - String retVal = null; - if (dto instanceof Office ) { - retVal = new CsvV1Office().format(dto); - } else if (dto instanceof LocationGroup ) { - retVal = new CsvV1LocationGroup().format(dto); + try { + if (dto instanceof Office ) { + return new CsvV1Office().format(dto); + } else if (dto instanceof LocationGroup ) { + return new CsvV1LocationGroup().format(dto); + } else if (dto instanceof CwmsCsvDTO) { + return format((CwmsCsvDTO) dto, false); + } else { + throw new FormattingException(dto.getClass().getName() + " is not currently supported for CSV formatting."); + } + } catch (Exception e) { + throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e); + } + } + + @Override + public String formatWithMetadata(CwmsCsvDTO dto, boolean includeOptionalColumns) { + try { + return CwmsCsvProcessor.formatCwmsCsv(dto, includeOptionalColumns, true); + } catch (JsonProcessingException e) { + throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e); + } + } + + @Override + public String format(CwmsCsvDTO dto, boolean includeOptionalColumns) { + try { + return CwmsCsvProcessor.formatCwmsCsv(dto, includeOptionalColumns, false); + } catch (JsonProcessingException e) { + throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e); } - return retVal; } + + @Override public String format(List dtoList) { - String retVal = null; if (dtoList != null && !dtoList.isEmpty()) { CwmsDTOBase dto = dtoList.get(0); if (dto instanceof Office) { - retVal = new CsvV1Office().format(dtoList); - } else if(dto instanceof LocationGroup) { - retVal = new CsvV1LocationGroup().format(dtoList); + return new CsvV1Office().format(dtoList); + } else if (dto instanceof LocationGroup) { + return new CsvV1LocationGroup().format(dtoList); + } else { + throw new FormattingException(dto.getClass().getName() + " is not currently supported for CSV formatting."); } - } - return retVal; + return null; } @Override public T parseContent(String content, Class type) { - T retVal = null; if (type.isAssignableFrom(Office.class)) { - retVal = new CsvV1Office().parseContent(content, type); + return new CsvV1Office().parseContent(content, type); } else if (type.isAssignableFrom(LocationGroup.class)) { - retVal = new CsvV1LocationGroup().parseContent(content, type); + return new CsvV1LocationGroup().parseContent(content, type); + } else if (CwmsCsvDTO.class.isAssignableFrom(type)) { + return CwmsCsvProcessor.parseCwmsCsv(content, type); } - return retVal; + return null; } @Override @@ -63,4 +94,5 @@ public T parseContent(InputStream content, Class type } return retVal; } + } diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java new file mode 100644 index 0000000000..33fc926566 --- /dev/null +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java @@ -0,0 +1,473 @@ +package cwms.cda.formatters.csv; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MappingIterator; +import com.fasterxml.jackson.databind.PropertyNamingStrategies; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.databind.annotation.JsonNaming; +import com.fasterxml.jackson.databind.cfg.MapperConfig; +import com.fasterxml.jackson.databind.introspect.Annotated; +import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.dataformat.csv.CsvGenerator; +import com.fasterxml.jackson.dataformat.csv.CsvMapper; +import com.fasterxml.jackson.dataformat.csv.CsvParser; +import com.fasterxml.jackson.dataformat.csv.CsvSchema; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import cwms.cda.data.dto.CwmsDTOBase; +import cwms.cda.data.dto.csv.CsvRequiredColumn; +import cwms.cda.data.dto.csv.CsvUnitHeader; +import cwms.cda.data.dto.csv.CwmsCsvDTO; +import cwms.cda.formatters.FormattingException; +import cwms.cda.formatters.json.adapters.ZoneIdDeserializer; +import org.jetbrains.annotations.Nullable; + +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.ParameterizedType; +import java.time.Instant; +import java.time.ZoneId; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Utility class to processing cwms CSV DTOs for both reading and writing. + * Handles building header lines with units, building metadata comment lines, and parsing metadata and units from CSV content. + */ +public class CwmsCsvProcessor { + + private static String buildMetadataComments(Object dto) { + StringBuilder sb = new StringBuilder(); + Class cls = dto.getClass(); + List fields = getAllFields(cls); + for (Field f : fields) { + if (f.isAnnotationPresent(CsvRows.class)) { + continue; + } + f.setAccessible(true); + try { + Object val = f.get(dto); + if (val != null) { + String key = resolveKeyName(f, cls); + sb.append("# ").append(key).append(": ").append(val).append("\n"); + } + } catch (IllegalAccessException ex) { + throw new FormattingException("Error building metadata comments for " + cls.getName(), ex); + } + } + return sb.toString(); + } + + private static List getAllFields(Class cls) { + List fields = new ArrayList<>(); + Class current = cls; + while (current != null && current != Object.class) { + fields.addAll(Arrays.asList(current.getDeclaredFields())); + current = current.getSuperclass(); + } + return fields; + } + + private static String buildHeader(Object dto, boolean includeOptionalColumns) { + StringBuilder sb = new StringBuilder(); + Map fieldToUnits = new HashMap<>(); + + // Find row type and examples to get units from + Class rowType = null; + Object firstRow = null; + if (dto instanceof CwmsCsvDTO) { + List rows = ((CwmsCsvDTO) dto).getRows(); + if (rows != null && !rows.isEmpty()) { + firstRow = rows.get(0); + rowType = firstRow.getClass(); + } + } else if (dto instanceof List && !((List) dto).isEmpty()) { + firstRow = ((List) dto).get(0); + rowType = firstRow.getClass(); + } else { + rowType = dto.getClass(); + } + + try { + // Check top-level DTO first + extractFieldToUnits(dto, fieldToUnits); + + // If it's a list or CwmsCsv, also check the row object for @CsvUnitHeader + if (firstRow != null) { + extractFieldToUnits(firstRow, fieldToUnits); + } + } catch (InvocationTargetException | IllegalAccessException e) { + throw new FormattingException("Error extracting units for CSV header", e); + } + + if (rowType != null) { + List columns = new ArrayList<>(); + for (Field f : getAllFields(rowType)) { + JsonProperty jp = f.getAnnotation(JsonProperty.class); + if (jp != null && jp.index() != JsonProperty.INDEX_UNKNOWN) { + if (!includeOptionalColumns && !f.isAnnotationPresent(CsvRequiredColumn.class)) { + continue; + } + String name = resolvePropertyName(f); + String units = fieldToUnits.get(name); + if (units != null) { + name = name + " (" + units + ")"; + } + columns.add(new ColumnInfo(name, jp.index())); + } + } + columns.sort(Comparator.comparingInt(c -> c.order)); + for (int i = 0; i < columns.size(); i++) { + sb.append(columns.get(i).name); + if (i < columns.size() - 1) { + sb.append(","); + } + } + sb.append("\n"); + } + + return sb.toString(); + } + + public static String stripHeader(String csv) { + StringBuilder sb = new StringBuilder(); + String[] lines = csv.split("\\r?\\n"); + boolean headerFound = false; + for (String line : lines) { + if (!headerFound) { + if (!line.startsWith("#")) { + headerFound = true; // first non-comment line is the header + } + continue; //continue until we find the header, skipping all comment lines, and also skipping the header line itself + } + sb.append(line).append(System.lineSeparator()); + } + return sb.toString(); + } + + private static void extractFieldToUnits(Object dto, Map fieldToUnits) throws IllegalAccessException, InvocationTargetException { + if (dto == null) return; + Class cls = dto.getClass(); + List fields = getAllFields(cls); + for (Field f : fields) { + CsvUnitHeader ann = f.getAnnotation(CsvUnitHeader.class); + if (ann != null) { + f.setAccessible(true); + Object val = f.get(dto); + if (val != null) { + String unitVal = val.toString(); + if (!unitVal.isEmpty()) { + fieldToUnits.put(ann.field(), unitVal); + } + } + } + } + List methods = getAllMethods(cls); + for (Method m : methods) { + CsvUnitHeader ann = m.getAnnotation(CsvUnitHeader.class); + if (ann != null && m.getParameterCount() == 0) { + m.setAccessible(true); + Object val = m.invoke(dto); + if (val != null) { + String unitVal = val.toString(); + if (!unitVal.isEmpty()) { + fieldToUnits.put(ann.field(), unitVal); + } + } + } + } + } + + static T parseCwmsCsv(String content, Class type) { + try { + T dto; + try { + dto = type.getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException e) { + // No default constructor, try to find a Builder + Class builderClass = null; + for (Class inner : type.getDeclaredClasses()) { + if (inner.getSimpleName().equals("Builder")) { + builderClass = inner; + break; + } + } + if (builderClass != null) { + Object builder = builderClass.getDeclaredConstructor().newInstance(); + Method buildMethod = builderClass.getDeclaredMethod("build"); + dto = type.cast(buildMethod.invoke(builder)); + } else { + throw e; + } + } + + if (dto instanceof CwmsCsvDTO) { + Map metadata = CwmsCsvProcessor.parseMetadata(content); + String units = CwmsCsvProcessor.parseUnits(content); + CwmsCsvProcessor.injectMetadataAndUnits(content, type, dto, metadata, units); + return dto; + } + } catch (Exception e) { + throw new FormattingException("Could not parse " + type.getName(), e); + } + throw new FormattingException("Could not parse " + type.getName() + ". Must be a " + CwmsCsvDTO.class.getName()); + } + + private static void injectMetadataAndUnits(String content, Class type, T dto, Map metadata, String units) throws IOException, IllegalAccessException { + // Inject metadata into DTO + CwmsCsvProcessor.applyMetadataAndUnits(dto, metadata, units); + + Field rowsField = getRowsField(type); + + if (rowsField != null) { + rowsField.setAccessible(true); + Class rowType = (Class) ((ParameterizedType) rowsField.getGenericType()).getActualTypeArguments()[0]; + List rows = parseRows(content, rowType); + + if (units != null) { + for (Object row : rows) { + CwmsCsvProcessor.applyMetadataAndUnits(row, metadata, units); + } + } + + rowsField.set(dto, rows); + } + } + + private static List parseRows(String content, Class csvRowDtoType) throws IOException { + CsvMapper csvMapper = buildObjectMapper(true, csvRowDtoType); + csvMapper.enable(CsvParser.Feature.ALLOW_COMMENTS); + csvMapper.enable(CsvParser.Feature.SKIP_EMPTY_LINES); + csvMapper.enable(CsvParser.Feature.TRIM_SPACES); + CsvSchema schema = csvMapper.schemaFor(csvRowDtoType).withHeader(); + try (MappingIterator it = csvMapper.readerFor(csvRowDtoType).with(schema).readValues(content)) { + return it.readAll(); + } + } + + static String formatCwmsCsv(CwmsCsvDTO dto, boolean includeOptionalColumns, boolean includeMetadata) throws JsonProcessingException { + StringBuilder sb = new StringBuilder(); + if (includeMetadata) { + sb.append(CwmsCsvProcessor.buildMetadataComments(dto)); + } + List rows = dto.getRows(); + if (rows != null && !rows.isEmpty()) { + Object firstRow = rows.get(0); + CsvMapper csvMapper = CwmsCsvProcessor.buildObjectMapper(includeOptionalColumns, firstRow.getClass()); + CsvSchema schema = csvMapper.schemaFor(firstRow.getClass()).withoutHeader(); + String header = CwmsCsvProcessor.buildHeader(dto, includeOptionalColumns); + sb.append(header); + sb.append(csvMapper.writer(schema).writeValueAsString(rows)); + } + return sb.toString(); + } + + private static CsvMapper buildObjectMapper(boolean includeOptionalColumns, Class rowType) { + CsvMapper mapper = new CsvMapper(); + + mapper.findAndRegisterModules(); + // Without these two disables an Instant gets written as 3333333.335000000 + mapper.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS); + mapper.disable(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS); + mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + mapper.disable(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS); + mapper.enable(CsvGenerator.Feature.STRICT_CHECK_FOR_QUOTING); + + mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + mapper.registerModule(new JavaTimeModule()); + mapper.registerModule(new Jdk8Module()); + mapper.enable(CsvParser.Feature.ALLOW_COMMENTS); + SimpleModule module = new SimpleModule(); + module.addDeserializer(ZoneId.class, new ZoneIdDeserializer()); + mapper.registerModule(module); + if(!includeOptionalColumns && rowType != null) { + registerIgnoreOptionalColumns(rowType, mapper); + } + return mapper; + } + + private static void registerIgnoreOptionalColumns(Class rowType, CsvMapper mapper) { + Set ignoredFields = new HashSet<>(); + List fields = CwmsCsvProcessor.getAllFields(rowType); + for (Field f : fields) { + JsonProperty jp = f.getAnnotation(JsonProperty.class); + if (jp != null && jp.index() != JsonProperty.INDEX_UNKNOWN) { + if (!f.isAnnotationPresent(CsvRequiredColumn.class)) { + ignoredFields.add(f.getName()); + } + } + } + if (!ignoredFields.isEmpty()) { + mapper.addMixIn(rowType, PropertyFilterMixIn.class); + mapper.setAnnotationIntrospector(new JacksonAnnotationIntrospector() { + @Override + public JsonIgnoreProperties.Value findPropertyIgnoralByName(MapperConfig config, Annotated ac) { + if (ac.getRawType().equals(rowType)) { + return JsonIgnoreProperties.Value.forIgnoreUnknown(true).withIgnored(ignoredFields); + } + return super.findPropertyIgnoralByName(null, ac); + } + }); + } + } + + @JsonIgnoreProperties(ignoreUnknown = true) + abstract static class PropertyFilterMixIn {} + + private static @Nullable Field getRowsField(Class type) { + // Find rows field + Field rowsField = null; + for (Field f : CwmsCsvProcessor.getAllFields(type)) { + if (f.isAnnotationPresent(CsvRows.class)) { + rowsField = f; + break; + } + } + return rowsField; + } + + private static List getAllMethods(Class cls) { + List methods = new ArrayList<>(); + Class current = cls; + while (current != null && current != Object.class) { + methods.addAll(Arrays.asList(current.getDeclaredMethods())); + current = current.getSuperclass(); + } + return methods; + } + + private static String resolvePropertyName(Field f) { + JsonProperty jp = f.getAnnotation(JsonProperty.class); + if (jp != null && !jp.value().isEmpty()) { + return jp.value(); + } + JsonNaming naming = f.getDeclaringClass().getAnnotation(JsonNaming.class); + return getName(f, naming); + } + + private static String getName(Field f, JsonNaming naming) { + if (naming != null) { + try { + Object strat = naming.value().getDeclaredConstructor().newInstance(); + if (strat instanceof PropertyNamingStrategies.NamingBase) { + return ((PropertyNamingStrategies.NamingBase) strat).translate(f.getName()); + } + } catch (InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) { + throw new FormattingException("Error resolving property name for " + f.getName(), e); + } + } + return f.getName(); + } + + private static String resolveKeyName(Field f, Class owner) { + JsonProperty jp = f.getAnnotation(JsonProperty.class); + if (jp != null && !jp.value().isEmpty()) { + return jp.value(); + } + JsonNaming naming = owner.getAnnotation(JsonNaming.class); + return getName(f, naming); + } + + private static Object convertToType(String val, Class type) { + if (type == String.class) return val; + if (type == Instant.class) return Instant.parse(val); + if (type == Integer.class || type == int.class) return Integer.parseInt(val); + if (type == Double.class || type == double.class) return Double.parseDouble(val); + if (type == Long.class || type == long.class) return Long.parseLong(val); + if (type == Boolean.class || type == boolean.class) return Boolean.parseBoolean(val); + return null; + } + + private static void applyMetadataAndUnits(Object dto, Map metadata, String units) { + if (dto == null) return; + Class cls = dto.getClass(); + // Use a list of all fields including superclasses + List allFields = CwmsCsvProcessor.getAllFields(cls); + + for (Field f : allFields) { + f.setAccessible(true); + try { + // Handle Units via @CsvUnitHeader + if (units != null) { + CsvUnitHeader unitAnn = f.getAnnotation(CsvUnitHeader.class); + if (unitAnn != null) { + f.set(dto, units); + } + } + + // Handle Metadata + String key = CwmsCsvProcessor.resolveKeyName(f, cls); + String val = metadata.get(key); + if (val != null) { + Object converted = CwmsCsvProcessor.convertToType(val, f.getType()); + if (converted != null) { + f.set(dto, converted); + } + } + } catch (IllegalAccessException e) { + throw new FormattingException("Error applying metadata to field " + f.getName(), e); + } + } + } + + private static Map parseMetadata(String content) { + Map metadata = new HashMap<>(); + String[] lines = content.split("\\r?\\n"); + for (String line : lines) { + String trimmed = line.trim(); + if (trimmed.startsWith("#")) { + String comment = trimmed.substring(1).trim(); + int colon = comment.indexOf(':'); + if (colon != -1) { + String key = comment.substring(0, colon).trim(); + String val = comment.substring(colon + 1).trim(); + metadata.put(key, val); + } + } else if (!trimmed.isEmpty()) { + break; // Header or data starts + } + } + return metadata; + } + + private static String parseUnits(String content) { + String[] lines = content.split("\\r?\\n"); + for (String line : lines) { + String trimmed = line.trim(); + if (trimmed.startsWith("#")) continue; + if (trimmed.isEmpty()) continue; + // First non-comment line is header + int start = trimmed.indexOf('('); + int end = trimmed.indexOf(')'); + if (start != -1 && end != -1 && end > start) { + return trimmed.substring(start + 1, end); + } + break; + } + return null; + } + + private static class ColumnInfo { + final String name; + final int order; + ColumnInfo(String name, int order) { + this.name = name; + this.order = order; + } + } + +} diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java b/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java index 2091c3cf78..1726febd4f 100644 --- a/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java +++ b/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java @@ -30,6 +30,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.SerializerProvider; import com.fasterxml.jackson.databind.ser.std.StdSerializer; +import com.fasterxml.jackson.dataformat.csv.CsvGenerator; import com.fasterxml.jackson.dataformat.xml.ser.XmlSerializerProvider; import cwms.cda.data.dto.TimeSeries; import java.io.IOException; @@ -44,6 +45,7 @@ public TimeSeriesRecordSerializer() { public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, SerializerProvider provider) throws IOException { + if (provider instanceof XmlSerializerProvider) { // Handle XML serialization @@ -54,7 +56,9 @@ public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, Serializ } else { gen.writeNumberField("value", recordValue.getValue()); } - gen.writeNumberField("quality-code", recordValue.getQualityCode()); + if(!(gen instanceof CsvGenerator)) { + gen.writeNumberField("quality-code", recordValue.getQualityCode()); + } gen.writeEndObject(); } else { // Handle JSON serialization @@ -66,12 +70,14 @@ public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, Serializ } else { gen.writeNumber(recordValue.getValue()); } - gen.writeNumber(recordValue.getQualityCode()); - // Used to include the dataEntryDate in the serialized output if requested. Modifies length of the output array. - // If the dataEntryDate is requested, it will always be non-null - // Without the dataEntryDate, the array will have 3 elements: [dateTime, value, qualityCode] - if (recordValue.getDataEntryDate() != null) { - gen.writeNumber(recordValue.getDataEntryDate().getTime()); + if(!(gen instanceof CsvGenerator)) { + gen.writeNumber(recordValue.getQualityCode()); + // Used to include the dataEntryDate in the serialized output if requested. Modifies length of the output array. + // If the dataEntryDate is requested, it will always be non-null + // Without the dataEntryDate, the array will have 3 elements: [dateTime, value, qualityCode] + if (recordValue.getDataEntryDate() != null) { + gen.writeNumber(recordValue.getDataEntryDate().getTime()); + } } gen.writeEndArray(); } diff --git a/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java index a4bf226dc0..423755586b 100644 --- a/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java +++ b/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java @@ -26,6 +26,8 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -62,7 +64,6 @@ import javax.servlet.http.HttpServletResponse; import java.io.InputStream; import java.nio.charset.StandardCharsets; -import java.sql.PreparedStatement; import java.sql.SQLException; import java.time.ZonedDateTime; import java.util.ArrayList; @@ -1826,6 +1827,323 @@ void test_wrong_units() throws Exception { ; } + @Test + void test_csv_default_columns_with_metadata() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + + InputStream resource = this.getClass().getResourceAsStream( + "/cwms/cda/api/lrl/1day_offset.json"); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + createLocation(location, true, officeId); + + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + String firstPoint = "2023-02-02T06:00:00-05:00"; + // insert + given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.JSONV2) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization",user.toHeaderValue()) + .queryParam(OFFICE,officeId) + .when() + .post("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .statusCode(is(HttpServletResponse.SC_OK)); + + // retrieve CSV v2 with metadata comments explicitly requested + String body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN, firstPoint) + .queryParam(Controllers.END, firstPoint) + .queryParam(Controllers.INCLUDE_METADATA_AS_CSV_COMMENTS, true) + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .statusCode(is(HttpServletResponse.SC_OK)) + .extract().asString(); + + assertNotNull(body); + String normalized = body.replace("\r", ""); + assertTrue(normalized.startsWith("# "), "Expected metadata comments"); + assertTrue(normalized.contains("date-time,value (F)\n"), + "Expected header 'date-time,value (F)' but was: " + normalized.split("\n")[0]); + } + + + @Test + void test_csv_optional_columns_no_metadata() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + InputStream resource = this.getClass().getResourceAsStream( + "/cwms/cda/api/lrl/1day_offset.json"); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + createLocation(location, true, officeId); + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + String firstPoint = "2023-02-02T06:00:00-05:00"; + given() + .accept(Formats.JSONV2) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization",user.toHeaderValue()) + .queryParam(OFFICE,officeId) + .when() + .post("/timeseries/") + .then() + .statusCode(is(HttpServletResponse.SC_OK)); + + String body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN, firstPoint) + .queryParam(Controllers.END, firstPoint) + .queryParam("include-optional-csv-columns", true) + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .statusCode(is(HttpServletResponse.SC_OK)) + .extract().asString(); + + assertNotNull(body); + String normalized = body.replace("\r", ""); + assertTrue(normalized.contains("date-time,value (F),data-entry-date,quality-code"), + "Expected header with optional columns but was: " + normalized.split("\n")[0]); + } + + @Test + void test_csv_optional_columns_with_metadata() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + InputStream resource = this.getClass().getResourceAsStream( + "/cwms/cda/api/lrl/1day_offset.json"); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + createLocation(location, true, officeId); + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + String firstPoint = "2023-02-02T06:00:00-05:00"; + given() + .accept(Formats.JSONV2) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization",user.toHeaderValue()) + .queryParam(OFFICE,officeId) + .when() + .post("/timeseries/") + .then() + .statusCode(is(HttpServletResponse.SC_OK)); + + String body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN, firstPoint) + .queryParam(Controllers.END, firstPoint) + .queryParam("include-metadata-as-comments", true) + .queryParam("include-optional-csv-columns", true) + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .statusCode(is(HttpServletResponse.SC_OK)) + .extract().asString(); + + assertNotNull(body); + String normalized = body.replace("\r", ""); + assertTrue(normalized.startsWith("# time-series-id: "), "Expected metadata comments"); + assertTrue(normalized.contains("date-time,value (F),data-entry-date,quality-code"), + "Expected header with optional columns"); + } + + @Test + void test_csv_streaming() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + InputStream resource = this.getClass().getResourceAsStream( + "/cwms/cda/api/lrl/large_timeseries.json"); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + createLocation(location, true, officeId); + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + + // 1. Store the large dataset + given() + .accept(Formats.JSONV2) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization", user.toHeaderValue()) + .queryParam(OFFICE, officeId) + .when() + .post("/timeseries/") + .then() + .statusCode(is(HttpServletResponse.SC_OK)); + + // 2. Client requests CSV and receives a single streamed response. + // The small page-size (10) forces the server to perform 5 internal fetch cycles + // for the 50 points in large_timeseries.json. + String body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.PAGE_SIZE, 10) + .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z") + .queryParam(Controllers.END, "2023-01-13T13:00:00Z") + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL, true) + .statusCode(is(HttpServletResponse.SC_OK)) + .header("X-Stream-Batch-Size", "10") + .extract().asString(); + + // 3. Verify total data integrity + assertNotNull(body); + String[] lines = body.replace("\r", "").split("\n"); + // 50 points + 1 header = 51 lines + assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)"); + assertEquals("date-time,value (cfs)", lines[0]); + + // Verify first and last data points + assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0")); + assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0")); + + // 4. Client requests CSV and receives a single streamed response. + // The small page-size (6) forces the server to perform 9 internal fetch cycles + // for the 50 points in large_timeseries.json. + body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.PAGE_SIZE, 6) + .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z") + .queryParam(Controllers.END, "2023-01-13T13:00:00Z") + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL, true) + .statusCode(is(HttpServletResponse.SC_OK)) + .header("X-Stream-Batch-Size", "6") + .extract().asString(); + + // 5. Verify total data integrity + assertNotNull(body); + lines = body.replace("\r", "").split("\n"); + // 50 points + 1 header = 51 lines + assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)"); + assertEquals("date-time,value (cfs)", lines[0]); + + // Verify first and last data points + assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0")); + assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0")); + + // 6. Client requests CSV and receives a single streamed response. + // The large page-size (60) forces the server to perform 1 internal fetch cycles + // for the 50 points in large_timeseries.json. + body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.PAGE_SIZE, 60) + .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z") + .queryParam(Controllers.END, "2023-01-13T13:00:00Z") + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL, true) + .statusCode(is(HttpServletResponse.SC_OK)) + .header("X-Stream-Batch-Size", "60") + .extract().asString(); + + // 7. Verify total data integrity + assertNotNull(body); + lines = body.replace("\r", "").split("\n"); + // 50 points + 1 header = 51 lines + assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)"); + assertEquals("date-time,value (cfs)", lines[0]); + + // Verify first and last data points + assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0")); + assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0")); + } + + @Test + void test_csv_default_columns_no_metadata() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + InputStream resource = this.getClass().getResourceAsStream( + "/cwms/cda/api/lrl/1day_offset.json"); + assertNotNull(resource); + String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8); + + JsonNode ts = mapper.readTree(tsData); + String location = ts.get(NAME).asText().split("\\.")[0]; + String officeId = ts.get("office-id").asText(); + + createLocation(location, true, officeId); + TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL; + String firstPoint = "2023-02-02T06:00:00-05:00"; + + given() + .accept(Formats.JSONV2) + .contentType(Formats.JSONV2) + .body(tsData) + .header("Authorization",user.toHeaderValue()) + .queryParam(OFFICE,officeId) + .when() + .post("/timeseries/") + .then() + .statusCode(is(HttpServletResponse.SC_OK)); + + String body = given() + .log().ifValidationFails(LogDetail.ALL, true) + .accept(Formats.CSV) + .queryParam(Controllers.OFFICE, officeId) + .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText()) + .queryParam(Controllers.BEGIN, firstPoint) + .queryParam(Controllers.END, firstPoint) + .when() + .get("/timeseries/") + .then() + .log().ifValidationFails(LogDetail.ALL,true) + .statusCode(is(HttpServletResponse.SC_OK)) + .extract().asString(); + + assertNotNull(body); + String normalized = body.replace("\r", ""); + assertFalse(normalized.startsWith("# "), "Expected no metadata comments"); + assertTrue(normalized.startsWith("date-time,value (F)\n"), + "Expected default columns header but was: " + normalized.split("\n")[0]); + } + enum GetAllTest { DEFAULT(Formats.DEFAULT, Formats.JSONV2), diff --git a/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java b/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java index 51e866aeff..62c337ef01 100644 --- a/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java +++ b/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java @@ -10,6 +10,7 @@ import java.sql.Timestamp; import java.time.Duration; import java.time.Instant; +import java.time.ZoneId; import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; import java.time.temporal.ChronoUnit; @@ -21,6 +22,7 @@ import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import cwms.cda.formatters.xml.XMLv2; +import cwms.cda.formatters.csv.CsvV1; import java.util.List; diff --git a/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java b/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java new file mode 100644 index 0000000000..60d735ba9a --- /dev/null +++ b/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java @@ -0,0 +1,197 @@ +package cwms.cda.data.dto.csv; + +import cwms.cda.formatters.csv.CsvV1; +import org.apache.commons.io.IOUtils; +import org.junit.jupiter.api.Test; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Instant; +import java.util.Collections; +import java.util.List; + +import static cwms.cda.helpers.DTOMatch.assertMatch; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestTimeSeriesCsvRow { + + @Test + void testSingleRow_Default_Columns_NoMetadata() throws Exception { + TimeSeriesCsvRow row = buildRow(Instant.parse("2021-06-21T21:06:00Z"), 1.0, "ft"); + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withRows(Collections.singletonList(row)) + .build(); + + CsvV1 csv = new CsvV1(); + String actual = csv.format(container); + assertNotNull(actual); + String normalized = normalize(actual); + assertTrue(normalized.contains("date-time,value (ft)"), "Header mismatch"); + assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0"), "Row mismatch"); + } + + @Test + void testSingleRow_Default_Columns_WithMetadataComments() throws Exception { + TimeSeriesCsvRow row = buildRow(Instant.parse("2021-06-21T21:06:00Z"), 1.0, "ft"); + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withTimeSeriesId("RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST") + .withOfficeId("SPK") + .withVersionDate("2025-07-22T14:00:00Z") + .withRows(Collections.singletonList(row)) + .build(); + + CsvV1 csv = new CsvV1(); + String actual = csv.formatWithMetadata(container, false); + assertNotNull(actual); + String normalized = normalize(actual); + assertTrue(normalized.contains("# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST"), "Metadata mismatch"); + assertTrue(normalized.contains("# office-id: SPK"), "Metadata mismatch"); + assertTrue(normalized.contains("date-time,value (ft)"), "Header mismatch"); + assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0"), "Row mismatch"); + } + + @Test + void testSingleRow_WithOptionalColumns() throws Exception { + TimeSeriesCsvRow row = new TimeSeriesCsvRow.Builder() + .withDateTime(Instant.parse("2021-06-21T21:06:00Z")) + .withValue(1.0) + .withQualityCode(0) + .withDataEntryDate(Instant.parse("2021-06-21T21:00:00Z")) + .withUnits("ft") + .build(); + TimeSeriesCsv container = new TimeSeriesCsv.Builder() + .withRows(Collections.singletonList(row)) + .build(); + + CsvV1 csv = new CsvV1(); + String actual = csv.format(container, true); + assertNotNull(actual); + String normalized = normalize(actual); + assertTrue(normalized.contains("date-time,value (ft),data-entry-date,quality-code"), "Header mismatch: " + normalized); + assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0,2021-06-21T21:00:00Z,0"), "Row mismatch: " + normalized); + } + + @Test + void testDefaultSerialization() throws Exception { + String csv = readResource("cwms/cda/data/dto/time-series-default.csv"); + CsvV1 formatter = new CsvV1(); + TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class); + assertNotNull(container); + List rows = container.getRows(); + assertEquals(4, rows.size()); + assertEquals("cfs", rows.get(0).getUnits()); + assertEquals(0.0, rows.get(0).getValue()); + assertEquals(Instant.parse("2021-06-21T00:00:00Z"), rows.get(0).getDateTime()); + + //serialize back + String serialized = formatter.format(container); + assertNotNull(serialized); + + //parse back + TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class); + assertNotNull(parsedContainer); + assertEquals(rows.get(0).getUnits(), parsedContainer.getRows().get(0).getUnits()); + assertMatch(rows, parsedContainer.getRows()); + } + + @Test + void testMetadataAsCommentsSerialization() throws Exception { + String csv = readResource("cwms/cda/data/dto/time-series-metadata-comments.csv"); + CsvV1 formatter = new CsvV1(); + TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class); + assertNotNull(container); + List rows = container.getRows(); + assertEquals(4, rows.size()); + assertEquals("cfs", rows.get(0).getUnits()); + assertEquals(0.0, rows.get(0).getValue()); + assertEquals("ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI", container.getTimeSeriesId()); + assertEquals("SWT", container.getOfficeId()); + assertEquals("aggregate", container.getVersionDate()); + + //serialize back + String serialized = formatter.formatWithMetadata(container, false); + assertNotNull(serialized); + + //parse back + TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class); + assertNotNull(parsedContainer); + assertEquals(container.getTimeSeriesId(), parsedContainer.getTimeSeriesId()); + assertEquals(container.getOfficeId(), parsedContainer.getOfficeId()); + assertEquals(container.getVersionDate(), parsedContainer.getVersionDate()); + assertMatch(rows, parsedContainer.getRows()); + } + + @Test + void testOptionalsNoMetadataSerialization() throws Exception { + String csv = readResource("cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv"); + CsvV1 formatter = new CsvV1(); + TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class); + assertNotNull(container); + List rows = container.getRows(); + assertEquals(4, rows.size()); + assertEquals("cfs", rows.get(0).getUnits()); + assertEquals(0.0, rows.get(0).getValue()); + assertNotNull(rows.get(0).getDataEntryDate()); + assertEquals(5, rows.get(0).getQualityCode()); + + //serialize back + String serialized = formatter.format(container, true); + assertNotNull(serialized); + + //parse back + TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class); + assertNotNull(parsedContainer); + assertMatch(rows, parsedContainer.getRows()); + } + + @Test + void testOptionalsWithMetadataCommentsSerialization() throws Exception { + String csv = readResource("cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv"); + CsvV1 formatter = new CsvV1(); + TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class); + assertNotNull(container); + List rows = container.getRows(); + assertEquals(4, rows.size()); + assertEquals("cfs", rows.get(0).getUnits()); + assertEquals(0.0, rows.get(0).getValue()); + assertEquals("ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI", container.getTimeSeriesId()); + assertEquals("SWT", container.getOfficeId()); + assertEquals("aggregate", container.getVersionDate()); + assertNotNull(rows.get(0).getDataEntryDate()); + assertEquals(5, rows.get(0).getQualityCode()); + + //serialize back + String serialized = formatter.formatWithMetadata(container, true); + assertNotNull(serialized); + + //parse back + TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class); + assertNotNull(parsedContainer); + assertEquals(container.getTimeSeriesId(), parsedContainer.getTimeSeriesId()); + assertEquals(container.getOfficeId(), parsedContainer.getOfficeId()); + assertEquals(container.getVersionDate(), parsedContainer.getVersionDate()); + assertMatch(rows, parsedContainer.getRows()); + } + + private static String readResource(String path) throws Exception { + InputStream stream = TestTimeSeriesCsvRow.class.getClassLoader().getResourceAsStream(path); + assertNotNull(stream, "Missing test resource: " + path); + String expected = IOUtils.toString(stream, StandardCharsets.UTF_8); + return normalize(expected); + } + + private static String normalize(String s) { + return s.replaceAll("\r", ""); + } + + private static TimeSeriesCsvRow buildRow(Instant dateTime, Double value, String units) { + return new TimeSeriesCsvRow.Builder() + .withDateTime(dateTime) + .withValue(value) + .withQualityCode(0) + .withUnits(units) + .build(); + } +} diff --git a/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java b/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java index 60067d5afb..1fa34a67c4 100644 --- a/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java +++ b/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java @@ -30,6 +30,8 @@ import cwms.cda.data.dto.TimeExtents; import cwms.cda.data.dto.TimeSeriesExtents; import cwms.cda.data.dto.catalog.LocationAlias; +import cwms.cda.data.dto.csv.TimeSeriesCsv; +import cwms.cda.data.dto.csv.TimeSeriesCsvRow; import cwms.cda.data.dto.location.kind.Lock; import cwms.cda.data.dto.CwmsDTOBase; import cwms.cda.data.dto.location.kind.GateChange; @@ -403,6 +405,29 @@ public static void assertMatch(WaterSupplyAccounting first, WaterSupplyAccountin ); } + public static void assertMatch(TimeSeriesCsv first, TimeSeriesCsv second) { + assertAll( + () -> assertEquals(first.getTimeSeriesId(), second.getTimeSeriesId(), "Time series IDs do not match"), + () -> assertEquals(first.getOfficeId(), second.getOfficeId(), "Office IDs do not match"), + () -> assertEquals(first.getVersionDate(), second.getVersionDate(), "Version dates do not match"), + () -> assertMatch(first.getRows(), second.getRows()) + ); + } + + public static void assertMatch(List first, List second) { + assertMatch(first, second, DTOMatch::assertMatch); + } + + public static void assertMatch(TimeSeriesCsvRow first, TimeSeriesCsvRow second) { + assertAll( + () -> assertEquals(first.getDateTime(), second.getDateTime(), "Date times do not match"), + () -> assertEquals(first.getDataEntryDate(), second.getDataEntryDate(), "Data entry dates do not match"), + () -> assertEquals(first.getUnits(), second.getUnits(), "Units do not match"), + () -> assertEquals(first.getValue(), second.getValue(), "Values do not match"), + () -> assertEquals(first.getQualityCode(), second.getQualityCode(), "Quality codes do not match") + ); + } + public static void assertMatch(Map> first, Map> second) { assertAll( () -> assertEquals(first.size(), second.size(), "Pump accounting sizes do not match"), diff --git a/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json b/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json new file mode 100644 index 0000000000..cc3810420b --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json @@ -0,0 +1,57 @@ +{ + "name": "Calhoun.Flow.Inst.1Hour.0.cda-test", + "office-id": "SPK", + "units": "cfs", + "values": [ + [1673438400000, 10, 0], + [1673442000000, 20, 0], + [1673445600000, 30, 0], + [1673449200000, 40, 0], + [1673452800000, 50, 0], + [1673456400000, 60, 0], + [1673460000000, 70, 0], + [1673463600000, 80, 0], + [1673467200000, 90, 0], + [1673470800000, 100, 0], + [1673474400000, 110, 0], + [1673478000000, 120, 0], + [1673481600000, 130, 0], + [1673485200000, 140, 0], + [1673488800000, 150, 0], + [1673492400000, 160, 0], + [1673496000000, 170, 0], + [1673499600000, 180, 0], + [1673503200000, 190, 0], + [1673506800000, 200, 0], + [1673510400000, 210, 0], + [1673514000000, 220, 0], + [1673517600000, 230, 0], + [1673521200000, 240, 0], + [1673524800000, 250, 0], + [1673528400000, 260, 0], + [1673532000000, 270, 0], + [1673535600000, 280, 0], + [1673539200000, 290, 0], + [1673542800000, 300, 0], + [1673546400000, 310, 0], + [1673550000000, 320, 0], + [1673553600000, 330, 0], + [1673557200000, 340, 0], + [1673560800000, 350, 0], + [1673564400000, 360, 0], + [1673568000000, 370, 0], + [1673571600000, 380, 0], + [1673575200000, 390, 0], + [1673578800000, 400, 0], + [1673582400000, 410, 0], + [1673586000000, 420, 0], + [1673589600000, 430, 0], + [1673593200000, 440, 0], + [1673596800000, 450, 0], + [1673600400000, 460, 0], + [1673604000000, 470, 0], + [1673607600000, 480, 0], + [1673611200000, 490, 0], + [1673614800000, 500, 0] + ] +} diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv new file mode 100644 index 0000000000..be1c3e6834 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv @@ -0,0 +1,2 @@ +date-time,value +2021-06-21T21:06:00Z,1.0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv new file mode 100644 index 0000000000..a18823480e --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv @@ -0,0 +1,2 @@ +time-series-id,office-id,date-time,value,units,version-date,quality-code +RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-21T21:06:00Z,1.0,ft,2025-07-22T14:00:00Z,0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv new file mode 100644 index 0000000000..507c45924d --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv @@ -0,0 +1,8 @@ +# metadata-count: 5 +# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST +# office-id: SPK +# units: ft +# version-date: 2025-07-22T14:00:00Z +# quality-code: 0 +date-time,value +2021-06-21T21:06:00Z,1.0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv new file mode 100644 index 0000000000..eeaa5005da --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv @@ -0,0 +1,3 @@ +time-series-id,office-id,date-time,value,units,version-date,quality-code +RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-21T21:06:00Z,1.0,ft,2025-07-22T14:00:00Z,0 +RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-22T21:06:00Z,2.0,ft,2025-07-22T14:00:00Z,0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv new file mode 100644 index 0000000000..b1112566b8 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv @@ -0,0 +1,9 @@ +# metadata-count: 5 +# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST +# office-id: SPK +# units: ft +# version-date: 2025-07-22T14:00:00Z +# quality-code: 0 +date-time,value +2021-06-21T21:06:00Z,1.0 +2021-06-22T21:06:00Z,2.0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv new file mode 100644 index 0000000000..75bdb32b51 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv @@ -0,0 +1,5 @@ +date-time, value (cfs) +2021-06-21T00:00:00Z, 0.0 +2021-06-22T00:00:00Z, 1.0 +2021-06-23T00:00:00Z, 2.0 +2021-06-24T00:00:00Z, 3.0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv new file mode 100644 index 0000000000..50e2a69716 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv @@ -0,0 +1,8 @@ +# time-series-id: ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI +# office-id: SWT +# version-date: aggregate +date-time, value (cfs) +2021-06-21T00:00:00Z, 0.0 +2021-06-22T00:00:00Z, 1.0 +2021-06-23T00:00:00Z, 2.0 +2021-06-24T00:00:00Z, 3.0 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv new file mode 100644 index 0000000000..c509376e00 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv @@ -0,0 +1,5 @@ +date-time, value (cfs), data-entry-date, quality-code +2021-06-21T00:00:00Z, 0.0, 2021-06-21T00:05:00Z, 5 +2021-06-22T00:00:00Z, 1.0, 2021-06-22T00:05:00Z, 5 +2021-06-23T00:00:00Z, 2.0, 2021-06-23T00:05:00Z, 5 +2021-06-24T00:00:00Z, 3.0, 2021-06-24T00:05:00Z, 5 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv new file mode 100644 index 0000000000..13ec27f224 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv @@ -0,0 +1,9 @@ +# time-series-id: ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI +# office-id: SWT +# units: cfs +# version-date: aggregate +date-time,value (cfs),data-entry-date,quality-code +2021-06-21T00:00:00Z,0.0,2021-06-21T00:05:00Z,5 +2021-06-21T01:00:00Z,10.0,2021-06-21T01:05:00Z,5 +2021-06-21T02:00:00Z,20.0,2021-06-21T02:05:00Z,5 +2021-06-21T03:00:00Z,30.0,2021-06-21T03:05:00Z,5 diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv new file mode 100644 index 0000000000..4819c27820 --- /dev/null +++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv @@ -0,0 +1,3 @@ +date-time,value +1772826360000,12.34567 +1772826420000,12.34567 diff --git a/docs/source/decisions/csv-format.rst b/docs/source/decisions/csv-format.rst new file mode 100644 index 0000000000..22b2d660bd --- /dev/null +++ b/docs/source/decisions/csv-format.rst @@ -0,0 +1,59 @@ +##### +CSV Format (With emphasis on TimeSeries) +##### + + +Summary +======= + +This ADR defines a standardized CSV representation for the TimeSeries DTO with general considerations for future DTOs. It specifies a row-per-record CSV format that preserves essential metadata, flattens nested Record entries, and ensures consistent ingestion by analytics, automation, and warehousing systems. + + +Opinions +======== + +Opinion 1 +--------- + +@brysonspilman + +Summary: +Define a CSV format for the TimeSeries DTO that flattens each Record into one row, repeats metadata per row, uses kebab-case headers, and applies consistent data formatting rules. + +The TimeSeries DTO currently supports JSON and XML, but various downstream users require CSV. Due to nested structures—a clear flattening strategy is necessary. + +Key points: + +- CSV will not be supported for DTOs with multiple independent data collections. +- Timeseries CSV will include only the date-time and value Record fields (2 columns) +- Metadata fields are repeated on each row to eliminate the need for joins. +- Column names use kebab-case for consistency with JSON. +- Record date-time values are serialized as Unix epoch milliseconds (UTC). +- Null values are empty fields. Missing values use quality-code = 5. +- data-entry-date is empty when not present. +- UTF-8 encoding, comma delimiter, LF line endings, header included. +- One row is produced per Record. +- Multi-retrieve never includes multiple time-series IDs. + +Example CSV: + +date-time, value +1624287600000, 0.0 +1624288500000, 1.0 +1624289400000, 2.0 +1624290300000, 3.0 + + +Decision Status +=============== + +(Status: proposed) + + +References +========== + +Related Types: cwms.cda.data.dto.TimeSeries, TimeSeries.Record +Discussion: https://github.com/USACE/cwms-data-api/issues/1525#issuecomment-3974845633 +Date: 2026-02-26 +Owner: GEI – Bryson Spilman \ No newline at end of file