diff --git a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java
index cdf101876f..255de300ce 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/Controllers.java
@@ -57,6 +57,7 @@ public final class Controllers {
public static final String CURSOR = "cursor";
public static final String PAGE = "page";
public static final String PAGE_SIZE = "page-size";
+ public static final String CSV_RESUME_FROM_PAGE = "csv-resume-from-page";
// IF the constant has a number at the end its a deprecated variant
@@ -127,6 +128,8 @@ public final class Controllers {
public static final String MATCH_NULL_PARENTS = "match-null-parents";
public static final String ENTITY_ID = "entity-id";
public static final String PARENT_ENTITY_ID = "parent-entity-id";
+ public static final String INCLUDE_METADATA_AS_CSV_COMMENTS = "include-metadata-as-comments";
+ public static final String INCLUDE_OPTIONAL_CSV_COLUMNS = "include-optional-csv-columns";
public static final String CREATE_AS_LRTS = "create-as-lrts";
public static final String STORE_RULE = "store-rule";
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java b/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java
new file mode 100644
index 0000000000..d8ae6262d0
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java
@@ -0,0 +1,56 @@
+package cwms.cda.api;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.function.Function;
+
+public final class CursorIterator
implements Iterator
{
+
+ private final Function fetcher;
+ private final Function> itemsExtractor;
+ private final Function
nextCursorExtractor;
+
+ private String cursor;
+ private P nextPage;
+ private boolean initialized = false;
+
+ public CursorIterator(String initialCursor, Function fetcher, Function> itemsExtractor, Function
nextCursorExtractor) {
+ this.cursor = initialCursor == null ? "" : initialCursor;
+ this.fetcher = fetcher;
+ this.itemsExtractor = itemsExtractor;
+ this.nextCursorExtractor = nextCursorExtractor;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!initialized) {
+ nextPage = fetcher.apply(cursor);
+ initialized = true;
+ }
+
+ if (nextPage == null) return false;
+
+ List items = itemsExtractor.apply(nextPage);
+ return items != null && !items.isEmpty();
+ }
+
+ @Override
+ public P next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ P current = nextPage;
+
+ String nextCursor = nextCursorExtractor.apply(current);
+ if (nextCursor == null || nextCursor.isEmpty()) {
+ nextPage = null;
+ } else {
+ cursor = nextCursor;
+ nextPage = fetcher.apply(cursor);
+ }
+
+ return current;
+ }
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java
index 0483a71634..0b5ee18bc6 100644
--- a/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java
+++ b/cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java
@@ -21,7 +21,11 @@
import cwms.cda.data.dao.VerticalDatum;
import cwms.cda.data.dto.TimeSeries;
import cwms.cda.formatters.ContentType;
+import cwms.cda.formatters.csv.CsvV1;
+import cwms.cda.data.dto.csv.TimeSeriesCsv;
+import cwms.cda.data.dto.csv.TimeSeriesCsvRow;
import cwms.cda.formatters.Formats;
+import cwms.cda.formatters.csv.CwmsCsvProcessor;
import cwms.cda.helpers.DateUtils;
import io.javalin.apibuilder.CrudHandler;
import io.javalin.core.util.Header;
@@ -39,10 +43,17 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
+import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.utils.URIBuilder;
@@ -382,12 +393,25 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) {
+ "identifies where in the request you are. This is an opaque"
+ " value, and can be obtained from the 'next-page' value in "
+ "the response. Deprecated, use " + PAGE + " instead."),
- @OpenApiParam(name = PAGE_SIZE,
- type = Integer.class,
+ @OpenApiParam(name = PAGE_SIZE, type = Integer.class,
description = "How many entries per page returned. "
- + "Default " + DEFAULT_PAGE_SIZE + ". Use 0 to return an empty values array, "
+ + "For JSON/XML paging, this controls page size. "
+ + "For CSV, this controls the internal fetch batch size used while streaming a single response. "
+ + "CSV clients do not request subsequent pages. "
+ + "Default " + DEFAULT_PAGE_SIZE +". Use 0 to return an empty values array, "
+ "or -1 to return the entire window in one response without a next-page cursor. "
- + "Values less than -1 are invalid.")
+ + "Values less than -1 are invalid."),
+ @OpenApiParam(name = CSV_RESUME_FROM_PAGE,
+ description = "CSV only. Optional opaque cursor that tells the server where to begin the "
+ + "streamed CSV response. This is intended for resuming a previously interrupted "
+ + "CSV download. The response is still a single streamed CSV response, not a paged "
+ + "CSV response."),
+ @OpenApiParam(name = INCLUDE_METADATA_AS_CSV_COMMENTS, type = Boolean.class,
+ description = "When true, include dataset metadata as csv header comments "
+ + "prepended with # (default is false)."),
+ @OpenApiParam(name = INCLUDE_OPTIONAL_CSV_COLUMNS, type = Boolean.class,
+ description = "When true, include optional columns (quality-code, data-entry-date) "
+ + "in the CSV response (default is false).")
},
responses = {
@OpenApiResponse(status = STATUS_200,
@@ -397,6 +421,7 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) {
@OpenApiContent(from = TimeSeries.class, type = Formats.XMLV2),
@OpenApiContent(from = TimeSeries.class, type = Formats.XML),
@OpenApiContent(from = TimeSeries.class, type = Formats.JSON),
+ @OpenApiContent(from = TimeSeriesCsv.class, type= Formats.CSV),
@OpenApiContent(from = TimeSeries.class, type = ""),}),
@OpenApiResponse(status = STATUS_400, description = "Invalid parameter combination"),
@OpenApiResponse(status = STATUS_404, description = "The provided combination of "
@@ -471,6 +496,18 @@ public void getAll(@NotNull Context ctx) {
.withShouldTrim(trim.getOrDefault(true))
.withIncludeEntryDate(includeEntryDate)
.build();
+
+ // CSV: stream a single response; page-size is only internal batch size
+ if (Formats.CSV.equals(contentType.getType())) {
+ String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class)
+ .getOrDefault("");
+ boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class)
+ .getOrDefault(false);
+ int csvBatchSize = validateCsvBatchSize(pageSize);
+ streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType);
+ return;
+ }
+
// Execute DAO call with a timeout so we can return a clearer message instead of a generic 500
int apiTimeoutMs = Integer.getInteger("cwms.cda.api.apiTimeoutMs", 45000);
CompletableFuture daoFuture = CompletableFuture.supplyAsync(
@@ -492,7 +529,7 @@ public void getAll(@NotNull Context ctx) {
throw unwrapExecutionException(ex);
}
- if(datum != null) { //this will be null for non-elevation ts
+ if (datum != null) { //this will be null for non-elevation ts
// user has requested a specific vertical datum
VerticalDatum vd = VerticalDatum.valueOf(datum); // the users request
ts = TimeSeriesVerticalDatumConverter.convertToVerticalDatum(ts, vd);
@@ -521,10 +558,33 @@ public void getAll(@NotNull Context ctx) {
}
String office = ctx.queryParam(OFFICE);
+
+ // CSV: stream a single response; page-size is only internal batch size
+ if (Formats.CSV.equals(contentType.getType())) {
+ TimeSeriesRequestParameters requestParameters = new TimeSeriesRequestParameters.Builder()
+ .withNames(names)
+ .withOffice(office)
+ .withUnits(units)
+ .withBeginTime(beginZdt)
+ .withEndTime(endZdt)
+ .withShouldTrim(trim.getOrDefault(true))
+ .withIncludeEntryDate(includeEntryDate)
+ .build();
+
+ String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class)
+ .getOrDefault("");
+ boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class)
+ .getOrDefault(false);
+ int csvBatchSize = validateCsvBatchSize(pageSize);
+ streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType);
+ return;
+ }
+
results = dao.getTimeseries(format, names, office, units, datum, beginZdt, endZdt, tz);
ctx.status(HttpServletResponse.SC_OK);
ctx.result(results);
}
+
addDeprecatedContentTypeWarning(ctx, contentType);
requestResultSize.update(results.length());
} catch (NotFoundException e) {
@@ -552,6 +612,96 @@ static RuntimeException unwrapExecutionException(java.util.concurrent.ExecutionE
return new RuntimeException(cause);
}
+ private void streamCsv(@NotNull Context ctx, int batchSize, String resumeFromPage, boolean includeOptionalColumns,
+ TimeSeriesDao dao, TimeSeriesRequestParameters requestParameters, ContentType contentType) {
+ ctx.status(HttpServletResponse.SC_OK);
+ ctx.contentType(Formats.CSV);
+ ctx.header(Header.CONTENT_TYPE, Formats.CSV + "; charset=UTF-8");
+
+ boolean includeMetadataAsComments = ctx.queryParamAsClass(INCLUDE_METADATA_AS_CSV_COMMENTS, Boolean.class)
+ .getOrDefault(false);
+
+ ctx.header(Header.CACHE_CONTROL, "no-store");
+ ctx.header("X-Content-Type-Options", "nosniff");
+ ctx.header("X-Stream-Batch-Size", String.valueOf(batchSize));
+
+ CsvV1 csv = new CsvV1();
+
+ try {
+ ServletOutputStream out = ctx.res.getOutputStream();
+ Iterator iterator = new CursorIterator<>(
+ resumeFromPage,
+ cursor -> dao.getTimeseries(cursor, batchSize, requestParameters),
+ TimeSeries::getValues,
+ TimeSeries::getNextPage);
+ AtomicBoolean firstBatch = new AtomicBoolean(true);
+ iterator.forEachRemaining(ts -> streamBatches(ctx, includeOptionalColumns, ts, firstBatch, includeMetadataAsComments, csv, out));
+ } catch (IOException e) {
+ throw new DataAccessException("Failed streaming CSV response", e);
+ }
+
+ addDeprecatedContentTypeWarning(ctx, contentType);
+ }
+
+ private static void streamBatches(@NotNull Context ctx, boolean includeOptionalColumns, TimeSeries ts, AtomicBoolean firstBatch, boolean includeMetadataAsComments, CsvV1 csv, ServletOutputStream out) {
+ try {
+ List values = ts.getValues();
+
+ List rows = new ArrayList<>(values.size());
+ String tsId = ts.getName();
+ String officeId = ts.getOfficeId();
+ String units = ts.getUnits();
+ String versionDateStr = ts.getVersionDate() == null
+ ? null
+ : ts.getVersionDate().toInstant().toString();
+
+ for (TimeSeries.Record r : values) {
+ Instant dt = r.getDateTime() == null ? null : r.getDateTime().toInstant();
+ Instant entryDt = r.getDataEntryDate() == null ? null : r.getDataEntryDate().toInstant();
+
+ rows.add(new TimeSeriesCsvRow.Builder()
+ .withDateTime(dt)
+ .withValue(r.getValue())
+ .withQualityCode(r.getQualityCode())
+ .withDataEntryDate(entryDt)
+ .withUnits(units)
+ .build());
+ }
+
+ TimeSeriesCsv container = new TimeSeriesCsv.Builder()
+ .withTimeSeriesId(tsId)
+ .withOfficeId(officeId)
+ .withVersionDate(versionDateStr)
+ .withRows(rows)
+ .build();
+
+ String chunk;
+ if (firstBatch.get()) {
+ chunk = includeMetadataAsComments
+ ? csv.formatWithMetadata(container, includeOptionalColumns)
+ : csv.format(container, includeOptionalColumns);
+ } else {
+ chunk = csv.format(container, includeOptionalColumns);
+ chunk = CwmsCsvProcessor.stripHeader(chunk);
+ }
+
+ out.write(chunk.getBytes(StandardCharsets.UTF_8));
+ out.flush();
+ ctx.res.flushBuffer();
+
+ firstBatch.set(false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private int validateCsvBatchSize(int requestedPageSize) {
+ if (requestedPageSize <= 0) {
+ throw new IllegalArgumentException("For CSV streaming, page-size must be greater than 0.");
+ }
+ return requestedPageSize;
+ }
+
private void addLinkHeader(@NotNull Context ctx, TimeSeries ts, ContentType contentType) {
try {
// Send back the link to the next page in the response header
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java
index 8e337ab413..6c59659ee0 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java
@@ -4,7 +4,20 @@
import java.io.InputStream;
import java.sql.SQLException;
+/**
+ * A consumer for streaming binary data out to callers (e.g., HTTP layer).
+ *
+ * The primary abstract method accepts an InputStream with optional position and total length.
+ * For generated content where the total length is not known up front (e.g., CSV rendered on-demand),
+ * use the default two-argument overload which delegates with null position/length values, allowing
+ * callers to choose chunked transfer without setting Content-Length.
+ */
@FunctionalInterface
public interface StreamConsumer {
+
void accept(InputStream stream, long inputStreamPosition, String mediaType, long totalLength) throws SQLException, IOException;
+
+ default void accept(InputStream stream, String mediaType) throws SQLException, IOException {
+ accept(stream, 0L, mediaType, -1L);
+ }
}
\ No newline at end of file
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java
index 424ebbc91e..9b5349d2a3 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java
@@ -59,5 +59,31 @@ List findRecentsInRange(String office, String categoryId, String gr
List findMostRecentsInRange(String office, List tsIds, Timestamp pastLimit,
Timestamp futureLimit, UnitSystem unitSystem);
+ /**
+ * Streams the requested TimeSeries data in CSV format using a {@link StreamConsumer},
+ * similar to the BlobDao streaming pattern. The DAO will invoke the provided consumer
+ * with an InputStream that produces the CSV bytes on-the-fly, avoiding buffering the
+ * entire dataset in memory.
+ *
+ * Notes:
+ * - The consumer is responsible for copying the InputStream to the HTTP response and
+ * setting any additional headers as needed.
+ * - The mediaType passed to the consumer will be {@code Formats.CSV}.
+ * - The totalLength may be unknown and can be negative; callers should not rely on it.
+ *
+ * @param requestParameters request parameters describing the time-series retrieval
+ * @param pageSize page size to use when paging results internally; non-positive values disable limiting
+ * @param consumer a sink that will receive a streaming InputStream of CSV data
+ */
+ /**
+ * Stream time-series in CSV using CsvV1 formatter.
+ *
+ * @param requestParameters parameters describing the request
+ * @param consumer sink for the InputStream
+ * @param includeMetadataAsComments when true, include metadata as header comments
+ */
+ void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters,
+ StreamConsumer consumer,
+ boolean includeMetadataAsComments);
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java
index d7ba536372..aa497cbfb0 100644
--- a/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDaoImpl.java
@@ -1,6 +1,17 @@
package cwms.cda.data.dao;
import static com.google.common.flogger.LazyArgs.lazy;
+import cwms.cda.data.dao.rsql.FieldResolver;
+import cwms.cda.data.dao.rsql.MapFieldResolver;
+import cwms.cda.data.dao.rsql.RSQLConditionBuilder;
+import cwms.cda.data.dto.filteredtimeseries.FilteredTimeSeries;
+import cwms.cda.data.dto.catalog.TimeSeriesAlias;
+import cwms.cda.formatters.csv.CwmsCsvProcessor;
+import cwms.cda.helpers.DateUtils;
+
+import java.io.IOException;
+import java.sql.Connection;
+
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.countDistinct;
import static org.jooq.impl.DSL.field;
@@ -25,9 +36,6 @@
import com.google.common.flogger.FluentLogger;
import cwms.cda.api.enums.UnitSystem;
import cwms.cda.api.enums.VersionType;
-import cwms.cda.data.dao.rsql.FieldResolver;
-import cwms.cda.data.dao.rsql.MapFieldResolver;
-import cwms.cda.data.dao.rsql.RSQLConditionBuilder;
import cwms.cda.data.dto.Catalog;
import cwms.cda.data.dto.CwmsDTOPaginated;
import cwms.cda.data.dto.RecentValue;
@@ -38,15 +46,11 @@
import cwms.cda.data.dto.TsvId;
import cwms.cda.data.dto.VerticalDatumInfo;
import cwms.cda.data.dto.catalog.CatalogEntry;
-import cwms.cda.data.dto.catalog.TimeSeriesAlias;
import cwms.cda.data.dto.catalog.TimeseriesCatalogEntry;
-import cwms.cda.data.dto.filteredtimeseries.FilteredTimeSeries;
import cwms.cda.formatters.xml.XMLv1;
-import cwms.cda.helpers.DateUtils;
import cwms.cda.helpers.ZoneIdHelper;
import java.math.BigDecimal;
import java.math.BigInteger;
-import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Duration;
@@ -99,6 +103,12 @@
import usace.cwms.db.jooq.codegen.udt.records.ZTSV_ARRAY;
import usace.cwms.db.jooq.codegen.udt.records.ZTSV_TYPE;
+import java.nio.charset.StandardCharsets;
+import cwms.cda.formatters.csv.CsvV1;
+import cwms.cda.formatters.Formats;
+import cwms.cda.data.dto.csv.TimeSeriesCsv;
+import cwms.cda.data.dto.csv.TimeSeriesCsvRow;
+
public class TimeSeriesDaoImpl extends JooqDao implements TimeSeriesDao {
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
@@ -220,6 +230,277 @@ public String getTimeseries(String format, String names, String office, String u
timezone.getId(), office);
}
+ @Override
+ public void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters,
+ StreamConsumer consumer, boolean includeMetadataAsComments) {
+ // Delegate to the overload that supports tuning knobs with defaults
+ streamRequestedTimeSeriesCsv(requestParameters, consumer,
+ includeMetadataAsComments, null, null);
+ }
+
+ // Overload allowing the caller to provide JDBC/jOOQ fetch size and rows-per-buffer for CSV rendering
+ public void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters,
+ StreamConsumer consumer,
+ boolean includeMetadataAsComments,
+ Integer dbFetchSize,
+ Integer rowsPerBuffer) {
+ // This method mirrors the data selection flow of getRequestedTimeSeries, but delivers
+ // a streaming CSV InputStream via StreamConsumer (BlobDao pattern) to avoid buffering.
+ String names = requestParameters.getNames();
+ String office = requestParameters.getOffice();
+ String units = requestParameters.getUnits();
+ ZonedDateTime beginTime = requestParameters.getBeginTime();
+ ZonedDateTime endTime = requestParameters.getEndTime();
+
+ // Resolve office, timeseries id and code
+ final Field officeId = CWMS_UTIL_PACKAGE.call_GET_DB_OFFICE_ID(
+ office != null ? DSL.val(office) : CWMS_UTIL_PACKAGE.call_USER_OFFICE_ID());
+ final Field tsId = CWMS_TS_PACKAGE.call_GET_TS_ID__2(DSL.val(names), officeId);
+ final Field tsCode = CWMS_TS_PACKAGE.call_GET_TS_CODE__2(DSL.val(names), officeId);
+
+ // Determine actual unit (handle EN/SI to default units mapping)
+ Field unit = units.compareToIgnoreCase("SI") == 0 || units.compareToIgnoreCase("EN") == 0
+ ? CWMS_UTIL_PACKAGE.call_GET_DEFAULT_UNITS(CWMS_TS_PACKAGE.call_GET_BASE_PARAMETER_ID(tsCode), DSL.val(units, String.class))
+ : DSL.val(units, String.class);
+
+ // Give TVQ (time,value,quality) column names
+ Field dateTimeCol = field(DATE_TIME, Timestamp.class).as(DATE_TIME);
+ Field valueCol = field(VALUE, Double.class).as(VALUE);
+ Field qualityCol = field(QUALITY_CODE, Integer.class).as(QUALITY_CODE);
+
+ Long beginTimeMilli = beginTime.toInstant().toEpochMilli();
+ Long endTimeMilli = endTime.toInstant().toEpochMilli();
+ String trim = formatBool(requestParameters.isShouldTrim());
+ final String startInclusive = "T";
+ final String endInclusive = "T";
+ String previous = "F";
+ String next = "F";
+ Long versionDateMilli = requestParameters.getVersionDate() != null ? requestParameters.getVersionDate().toInstant().toEpochMilli() : null;
+ String maxVersion = requestParameters.getVersionDate() == null ? "T" : "F";
+
+ // Build the table(...) call to retrieve rows (mirror existing implementation style)
+ String retrievalMethod = "cwms_20.cwms_ts.retrieve_ts_out_tab"; // CSV excludes entry-date
+ SQL retrieveSelectData = DSL.sql(
+ "table(" + retrievalMethod + "(?,?," +
+ "cwms_20.cwms_util.to_timestamp(?),cwms_20.cwms_util.to_timestamp(?), 'UTC'," +
+ "?,?,?,?,?," + getVersionPart(requestParameters.getVersionDate()) + ",?,?) ) retrieveTs",
+ tsId,
+ unit,
+ beginTimeMilli,
+ endTimeMilli,
+ trim, startInclusive, endInclusive, previous, next,
+ versionDateMilli, maxVersion,
+ officeId
+ );
+
+ // Now select the needed columns (date_time, value, quality) and restrict the time window
+ SelectConditionStep> query = dsl.select(dateTimeCol, valueCol, qualityCol)
+ .from(retrieveSelectData)
+ .where(dateTimeCol.ge(CWMS_UTIL_PACKAGE.call_TO_TIMESTAMP__2(DSL.val(beginTimeMilli))))
+ .and(dateTimeCol.le(CWMS_UTIL_PACKAGE.call_TO_TIMESTAMP__2(DSL.val(endTimeMilli))));
+
+ // No page size limiting when streaming
+
+ logger.atFine().log("%s", lazy(() -> query.getSQL(ParamType.INLINED)));
+
+ // Apply optional JDBC fetch size hint
+ if (dbFetchSize != null && dbFetchSize > 0) {
+ try {
+ query.fetchSize(dbFetchSize);
+ } catch (Exception ex) {
+ // Some drivers may ignore/throw; log and continue
+ logger.atFine().withCause(ex).log("Fetch size hint ignored: %s", dbFetchSize);
+ }
+ }
+
+ // Create a lazy CSV InputStream that renders rows on-demand from the database cursor
+ try (Cursor> recCursor = query.fetchLazy()) {
+ String tsIdStr = dsl.fetchValue(tsId);
+ String officeResolved = dsl.fetchValue(officeId);
+ String resolvedUnits = dsl.fetchValue(unit);
+ Timestamp versionTs = requestParameters.getVersionDate() != null ? Timestamp.from(requestParameters.getVersionDate().toInstant()) : null;
+
+ CsvV1 csvFormatter = new CsvV1();
+
+ CsvOnDemandInputStream stream = new CsvOnDemandInputStream(
+ recCursor,
+ csvFormatter,
+ tsIdStr,
+ officeResolved,
+ resolvedUnits,
+ versionTs,
+ includeMetadataAsComments,
+ rowsPerBuffer
+ );
+
+ try (stream) {
+ // Unknown total length
+ consumer.accept(stream, Formats.CSV);
+ }
+ } catch (Exception e) {
+ throw new DataAccessException("Error generating CSV for TimeSeries", e);
+ }
+ }
+
+ // InputStream that renders CSV rows on-demand from a jOOQ Cursor
+ private static final class CsvOnDemandInputStream extends java.io.InputStream {
+ private final Cursor> cursor;
+ private final java.util.Iterator> it;
+ private final CsvV1 csv;
+ private final String tsIdStr;
+ private final String officeId;
+ private final String units;
+ private final Timestamp versionTs;
+ private final boolean includeMetaComments;
+ private final int rowsPerBuffer;
+
+ private byte[] buffer = new byte[0];
+ private int bufPos = 0;
+ private boolean first = true;
+ private boolean closed = false;
+
+ CsvOnDemandInputStream(Cursor> cursor,
+ CsvV1 csv,
+ String tsIdStr,
+ String officeId,
+ String units,
+ Timestamp versionTs,
+ boolean includeMetaComments,
+ Integer rowsPerBuffer) {
+ this.cursor = cursor;
+ this.it = cursor.iterator();
+ this.csv = csv;
+ this.tsIdStr = tsIdStr;
+ this.officeId = officeId;
+ this.units = units;
+ this.versionTs = versionTs;
+ this.includeMetaComments = includeMetaComments;
+ int rpb = rowsPerBuffer == null ? 1 : rowsPerBuffer;
+ this.rowsPerBuffer = rpb > 0 ? rpb : 1;
+ }
+
+ @Override
+ public int read() throws IOException {
+ byte[] one = new byte[1];
+ int r = read(one, 0, 1);
+ return r == -1 ? -1 : (one[0] & 0xFF);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ if (closed) {
+ throw new IOException("Stream closed");
+ }
+ if (b == null) {
+ throw new NullPointerException("b");
+ }
+ if (off < 0 || len < 0 || len > b.length - off) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int totalCopied = 0;
+ while (len > 0) {
+ if (bufPos >= buffer.length) {
+ if (!fillBuffer()) {
+ break; // EOF
+ }
+ }
+ int toCopy = Math.min(len, buffer.length - bufPos);
+ System.arraycopy(buffer, bufPos, b, off, toCopy);
+ bufPos += toCopy;
+ off += toCopy;
+ len -= toCopy;
+ totalCopied += toCopy;
+ }
+ return totalCopied == 0 ? -1 : totalCopied;
+ }
+
+ private boolean fillBuffer() {
+ if (!it.hasNext()) {
+ buffer = new byte[0];
+ bufPos = 0;
+ return false;
+ }
+
+ StringBuilder sb = new StringBuilder(8192);
+ int produced = 0;
+ while (it.hasNext() && produced < rowsPerBuffer) {
+ org.jooq.Record3 r = it.next();
+ Timestamp ts = r.value1();
+ Double val = r.value2();
+ Integer qualityCode = r.value3();
+
+ TimeSeriesCsvRow row = new TimeSeriesCsvRow.Builder()
+ .withDateTime(ts == null ? null : ts.toInstant())
+ .withValue(val)
+ .withQualityCode(qualityCode)
+ .withDataEntryDate(null) // entry date not available in this retrieval method
+ .withUnits(units)
+ .build();
+
+ String rendered;
+ if (first) {
+ TimeSeriesCsv container = new TimeSeriesCsv.Builder()
+ .withTimeSeriesId(tsIdStr)
+ .withOfficeId(officeId)
+ .withVersionDate(versionTs == null ? null : versionTs.toInstant().toString())
+ .withRows(Collections.singletonList(row))
+ .build();
+ rendered = csv.format(container);
+ if (!includeMetaComments) {
+ rendered = CwmsCsvProcessor.stripHeader(rendered);
+ }
+ first = false;
+ } else {
+ String csvLine = csv.format(Collections.singletonList(row));
+ rendered = extractLastCsvLine(csvLine);
+ if (!rendered.endsWith("\n")) {
+ rendered = rendered + "\n";
+ }
+ }
+ sb.append(rendered);
+ produced++;
+ }
+
+ buffer = sb.toString().getBytes(StandardCharsets.UTF_8);
+ bufPos = 0;
+ return buffer.length > 0;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (!closed) {
+ closed = true;
+ try {
+ cursor.close();
+ } catch (Exception ex) {
+ // ignore, we're closing
+ }
+ }
+ }
+ }
+
+ private static String extractLastCsvLine(String s) {
+ if (s == null || s.isEmpty()) {
+ return "";
+ }
+ int i = s.length() - 1;
+ // Skip trailing newline characters
+ while (i >= 0 && (s.charAt(i) == '\n' || s.charAt(i) == '\r')) {
+ i--;
+ }
+ if (i < 0) {
+ return "";
+ }
+ int start = s.lastIndexOf('\n', i);
+ String line = s.substring(start + 1, i + 1);
+ // Strip a trailing CR if present
+ if (!line.isEmpty() && line.charAt(line.length() - 1) == '\r') {
+ line = line.substring(0, line.length() - 1);
+ }
+ return line;
+ }
+
/**
* Retrieves a TimeSeries from the database
* @param page an opaque token used for paging
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java
new file mode 100644
index 0000000000..d5a3189b54
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvRequiredColumn.java
@@ -0,0 +1,14 @@
+package cwms.cda.data.dto.csv;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Indicates that a CSV column is required, i.e. not optional
+ */
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CsvRequiredColumn {
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java
new file mode 100644
index 0000000000..7da7b8e89c
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CsvUnitHeader.java
@@ -0,0 +1,22 @@
+package cwms.cda.data.dto.csv;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Annotation to indicate that a field should have units included in its CSV header.
+ * For example, if a field "value" is annotated with @CsvUnitHeader, and the DTO
+ * has a "units" field with value "ft", the CSV header for this column will be
+ * "value (ft)".
+ */
+@Target({ElementType.FIELD, ElementType.METHOD})
+@Retention(RetentionPolicy.RUNTIME)
+public @interface CsvUnitHeader {
+ /**
+ * The name(s) of the field(s) that should have units included in their CSV header.
+ * The units will be retrieved from the field/method annotated with this annotation.
+ */
+ String field();
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java
new file mode 100644
index 0000000000..53b07f5e05
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/CwmsCsvDTO.java
@@ -0,0 +1,7 @@
+package cwms.cda.data.dto.csv;
+
+import java.util.List;
+
+public interface CwmsCsvDTO {
+ List getRows();
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java
new file mode 100644
index 0000000000..ac52706145
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsv.java
@@ -0,0 +1,76 @@
+package cwms.cda.data.dto.csv;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+import cwms.cda.data.dto.CwmsDTOBase;
+import cwms.cda.formatters.Formats;
+import cwms.cda.formatters.annotations.FormattableWith;
+import cwms.cda.formatters.csv.CsvRows;
+import cwms.cda.formatters.csv.CsvV1;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class)
+@JsonDeserialize(builder = TimeSeriesCsv.Builder.class)
+@FormattableWith(contentType = Formats.CSV, formatter = CsvV1.class, aliases = {Formats.DEFAULT})
+public final class TimeSeriesCsv extends CwmsDTOBase implements CwmsCsvDTO {
+
+ private final String timeSeriesId;
+ private final String officeId;
+ private final String versionDate;
+
+ @CsvRows
+ private final List rows;
+
+ private TimeSeriesCsv(Builder builder) {
+ this.timeSeriesId = builder.timeSeriesId;
+ this.officeId = builder.officeId;
+ this.versionDate = builder.versionDate;
+ this.rows = builder.rows;
+ }
+
+ public String getTimeSeriesId() { return timeSeriesId; }
+ public String getOfficeId() { return officeId; }
+ public String getVersionDate() { return versionDate; }
+
+ @Override
+ public List getRows() {
+ return rows;
+ }
+
+ @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class)
+ public static final class Builder {
+ private String timeSeriesId;
+ private String officeId;
+ private String versionDate;
+ private List rows;
+
+ public Builder withTimeSeriesId(String timeSeriesId) {
+ this.timeSeriesId = timeSeriesId;
+ return this;
+ }
+
+ public Builder withOfficeId(String officeId) {
+ this.officeId = officeId;
+ return this;
+ }
+
+ public Builder withVersionDate(String versionDate) {
+ this.versionDate = versionDate;
+ return this;
+ }
+
+ public Builder withRows(List rows) {
+ this.rows = rows;
+ return this;
+ }
+
+ public TimeSeriesCsv build() {
+ return new TimeSeriesCsv(this);
+ }
+ }
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java
new file mode 100644
index 0000000000..f700d2fbb5
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/data/dto/csv/TimeSeriesCsvRow.java
@@ -0,0 +1,92 @@
+package cwms.cda.data.dto.csv;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+import cwms.cda.data.dto.CwmsDTOBase;
+
+import java.time.Instant;
+
+/**
+ * Single DTO for TimeSeries CSV rows. All potential columns exist on this class;
+ * only date-time and value are considered required and are annotated with @CsvRow
+ * so that when using the CsvV1 mapper they are the only columns included. All other
+ * fields are annotated with @CsvMetadata and will only be serialized when using a
+ * CsvMapper that does not apply CsvRow filtering (e.g., for metadata-as-columns=true).
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+@JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class)
+@JsonDeserialize(builder = TimeSeriesCsvRow.Builder.class)
+public final class TimeSeriesCsvRow extends CwmsDTOBase {
+
+ @CsvRequiredColumn
+ @JsonProperty(index = 0)
+ private final Instant dateTime;
+
+ @CsvRequiredColumn
+ @JsonProperty(index = 1)
+ private final Double value;
+
+ @JsonProperty(index = 2)
+ private final Instant dataEntryDate;
+
+ @JsonProperty(index = 3)
+ private final Integer qualityCode;
+
+ @CsvUnitHeader(field = "value")
+ private final String units;
+
+ private TimeSeriesCsvRow(Builder builder) {
+ this.dateTime = builder.dateTime;
+ this.value = builder.value;
+ this.qualityCode = builder.qualityCode;
+ this.dataEntryDate = builder.dataEntryDate;
+ this.units = builder.units;
+ }
+
+ public Instant getDateTime() { return dateTime; }
+ public Double getValue() { return value; }
+ public Integer getQualityCode() { return qualityCode; }
+ public Instant getDataEntryDate() { return dataEntryDate; }
+ public String getUnits() { return units; }
+
+ @JsonNaming(PropertyNamingStrategies.KebabCaseStrategy.class)
+ public static final class Builder {
+ private Instant dateTime;
+ private Double value;
+ private Instant dataEntryDate;
+ private Integer qualityCode;
+ private String units;
+
+ public Builder withDateTime(Instant dateTime) {
+ this.dateTime = dateTime;
+ return this;
+ }
+
+ public Builder withValue(Double value) {
+ this.value = value;
+ return this;
+ }
+
+ public Builder withDataEntryDate(Instant dataEntryDate) {
+ this.dataEntryDate = dataEntryDate;
+ return this;
+ }
+
+ public Builder withQualityCode(Integer qualityCode) {
+ this.qualityCode = qualityCode;
+ return this;
+ }
+
+ public Builder withUnits(String units) {
+ this.units = units;
+ return this;
+ }
+
+ public TimeSeriesCsvRow build() {
+ return new TimeSeriesCsvRow(this);
+ }
+ }
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java b/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java
index 2ab7d01325..7cc0d13378 100644
--- a/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/Formats.java
@@ -247,40 +247,33 @@ public static List parseContentList(ContentType type,
}
/**
- * Parses the supplied header param or queryParam to determine the content type.
- * If both are supplied an exception is thrown. If neither are supplied an exception is thrown.
+ * Parses the supplied header param and/or queryParam to determine the content type.
+ * Query parameter takes priority over the header and is parsed the same way as the header
+ * (i.e., supports full content types, versions, and DTO-specific aliases). If neither is
+ * supplied an exception is thrown.
*
* @param header Accept header value
* @param queryParam format query parameter value
* @param klass DTO object class, used for identifying content type aliases from the DTO's
* FormattableWith annotations.
* @return an appropriate standard mimetype for lookup
- * @throws FormattingException if the header and queryParam are both supplied or neither are
+ * @throws FormattingException if neither header nor queryParam can be parsed into a supported content type
*/
public static ContentType parseHeaderAndQueryParm(String header, String queryParam,
Class extends CwmsDTOBase> klass) {
+ // If a query parameter is provided, it overrides the header.
if (queryParam != null && !queryParam.isEmpty()) {
- if (header != null && !header.isEmpty() && !DEFAULT.equals(header.trim())) {
- // If the user supplies an accept header and also a format= parameter, which
- // should we use?
- // The older format= query parameters don't give us the option to supply a
- // version the
- // way that the accept header does.
- throw new UnsupportedFormatException("Accept header and query parameter are both "
- + "present, this is not supported.");
- }
-
ContentType ct = parseQueryParam(queryParam, klass);
if (ct != null) {
return ct;
- } else {
- throw new UnsupportedFormatException("content-type " + queryParam + " is not implemented");
}
- } else if (header == null) {
+ }
+
+ // No query parameter provided; use the header (parseHeader handles null/empty by mapping to */*)
+ if (header == null) {
throw new UnsupportedFormatException("no content type or format specified");
- } else {
- return parseHeader(header, klass);
}
+ return parseHeader(header, klass);
}
/**
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java
new file mode 100644
index 0000000000..29e4b3015d
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvFormatter.java
@@ -0,0 +1,9 @@
+package cwms.cda.formatters.csv;
+
+import cwms.cda.data.dto.csv.CwmsCsvDTO;
+import cwms.cda.formatters.OutputFormatter;
+
+public interface CsvFormatter extends OutputFormatter {
+ String formatWithMetadata(CwmsCsvDTO> dto, boolean includeOptionalColumns);
+ String format(CwmsCsvDTO> dto, boolean includeOptionalColumns);
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java
new file mode 100644
index 0000000000..de89fe81c9
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvRows.java
@@ -0,0 +1,14 @@
+package cwms.cda.formatters.csv;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks a field as containing the data rows for a CSV DTO.
+ */
+@Retention(RetentionPolicy.RUNTIME)
+@Target({ElementType.FIELD, ElementType.METHOD})
+public @interface CsvRows {
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java
index 5f270fa851..e52d4fa295 100644
--- a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CsvV1.java
@@ -1,56 +1,87 @@
package cwms.cda.formatters.csv;
-import java.io.InputStream;
-import java.util.List;
-
+import com.fasterxml.jackson.core.JsonProcessingException;
import cwms.cda.data.dto.CwmsDTOBase;
import cwms.cda.data.dto.LocationGroup;
import cwms.cda.data.dto.Office;
+import cwms.cda.data.dto.csv.CwmsCsvDTO;
import cwms.cda.formatters.Formats;
-import cwms.cda.formatters.OutputFormatter;
+import cwms.cda.formatters.FormattingException;
+import java.io.InputStream;
+import java.util.List;
-public class CsvV1 implements OutputFormatter {
+public class CsvV1 implements CsvFormatter {
@Override
public String getContentType() {
return Formats.CSV;
}
+ /**
+ * Default formatting does not include metadata in either columns or comments.
+ **/
@Override
public String format(CwmsDTOBase dto) {
- String retVal = null;
- if (dto instanceof Office ) {
- retVal = new CsvV1Office().format(dto);
- } else if (dto instanceof LocationGroup ) {
- retVal = new CsvV1LocationGroup().format(dto);
+ try {
+ if (dto instanceof Office ) {
+ return new CsvV1Office().format(dto);
+ } else if (dto instanceof LocationGroup ) {
+ return new CsvV1LocationGroup().format(dto);
+ } else if (dto instanceof CwmsCsvDTO) {
+ return format((CwmsCsvDTO>) dto, false);
+ } else {
+ throw new FormattingException(dto.getClass().getName() + " is not currently supported for CSV formatting.");
+ }
+ } catch (Exception e) {
+ throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public String formatWithMetadata(CwmsCsvDTO> dto, boolean includeOptionalColumns) {
+ try {
+ return CwmsCsvProcessor.formatCwmsCsv(dto, includeOptionalColumns, true);
+ } catch (JsonProcessingException e) {
+ throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e);
+ }
+ }
+
+ @Override
+ public String format(CwmsCsvDTO> dto, boolean includeOptionalColumns) {
+ try {
+ return CwmsCsvProcessor.formatCwmsCsv(dto, includeOptionalColumns, false);
+ } catch (JsonProcessingException e) {
+ throw new FormattingException("Could not serialize:" + dto.getClass().getName(), e);
}
- return retVal;
}
+
+
@Override
public String format(List extends CwmsDTOBase> dtoList) {
- String retVal = null;
if (dtoList != null && !dtoList.isEmpty()) {
CwmsDTOBase dto = dtoList.get(0);
if (dto instanceof Office) {
- retVal = new CsvV1Office().format(dtoList);
- } else if(dto instanceof LocationGroup) {
- retVal = new CsvV1LocationGroup().format(dtoList);
+ return new CsvV1Office().format(dtoList);
+ } else if (dto instanceof LocationGroup) {
+ return new CsvV1LocationGroup().format(dtoList);
+ } else {
+ throw new FormattingException(dto.getClass().getName() + " is not currently supported for CSV formatting.");
}
-
}
- return retVal;
+ return null;
}
@Override
public T parseContent(String content, Class type) {
- T retVal = null;
if (type.isAssignableFrom(Office.class)) {
- retVal = new CsvV1Office().parseContent(content, type);
+ return new CsvV1Office().parseContent(content, type);
} else if (type.isAssignableFrom(LocationGroup.class)) {
- retVal = new CsvV1LocationGroup().parseContent(content, type);
+ return new CsvV1LocationGroup().parseContent(content, type);
+ } else if (CwmsCsvDTO.class.isAssignableFrom(type)) {
+ return CwmsCsvProcessor.parseCwmsCsv(content, type);
}
- return retVal;
+ return null;
}
@Override
@@ -63,4 +94,5 @@ public T parseContent(InputStream content, Class type
}
return retVal;
}
+
}
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java
new file mode 100644
index 0000000000..33fc926566
--- /dev/null
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/csv/CwmsCsvProcessor.java
@@ -0,0 +1,473 @@
+package cwms.cda.formatters.csv;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MappingIterator;
+import com.fasterxml.jackson.databind.PropertyNamingStrategies;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.databind.annotation.JsonNaming;
+import com.fasterxml.jackson.databind.cfg.MapperConfig;
+import com.fasterxml.jackson.databind.introspect.Annotated;
+import com.fasterxml.jackson.databind.introspect.JacksonAnnotationIntrospector;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.fasterxml.jackson.dataformat.csv.CsvGenerator;
+import com.fasterxml.jackson.dataformat.csv.CsvMapper;
+import com.fasterxml.jackson.dataformat.csv.CsvParser;
+import com.fasterxml.jackson.dataformat.csv.CsvSchema;
+import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+import cwms.cda.data.dto.CwmsDTOBase;
+import cwms.cda.data.dto.csv.CsvRequiredColumn;
+import cwms.cda.data.dto.csv.CsvUnitHeader;
+import cwms.cda.data.dto.csv.CwmsCsvDTO;
+import cwms.cda.formatters.FormattingException;
+import cwms.cda.formatters.json.adapters.ZoneIdDeserializer;
+import org.jetbrains.annotations.Nullable;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.ParameterizedType;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Utility class to processing cwms CSV DTOs for both reading and writing.
+ * Handles building header lines with units, building metadata comment lines, and parsing metadata and units from CSV content.
+ */
+public class CwmsCsvProcessor {
+
+ private static String buildMetadataComments(Object dto) {
+ StringBuilder sb = new StringBuilder();
+ Class> cls = dto.getClass();
+ List fields = getAllFields(cls);
+ for (Field f : fields) {
+ if (f.isAnnotationPresent(CsvRows.class)) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ Object val = f.get(dto);
+ if (val != null) {
+ String key = resolveKeyName(f, cls);
+ sb.append("# ").append(key).append(": ").append(val).append("\n");
+ }
+ } catch (IllegalAccessException ex) {
+ throw new FormattingException("Error building metadata comments for " + cls.getName(), ex);
+ }
+ }
+ return sb.toString();
+ }
+
+ private static List getAllFields(Class> cls) {
+ List fields = new ArrayList<>();
+ Class> current = cls;
+ while (current != null && current != Object.class) {
+ fields.addAll(Arrays.asList(current.getDeclaredFields()));
+ current = current.getSuperclass();
+ }
+ return fields;
+ }
+
+ private static String buildHeader(Object dto, boolean includeOptionalColumns) {
+ StringBuilder sb = new StringBuilder();
+ Map fieldToUnits = new HashMap<>();
+
+ // Find row type and examples to get units from
+ Class> rowType = null;
+ Object firstRow = null;
+ if (dto instanceof CwmsCsvDTO) {
+ List> rows = ((CwmsCsvDTO>) dto).getRows();
+ if (rows != null && !rows.isEmpty()) {
+ firstRow = rows.get(0);
+ rowType = firstRow.getClass();
+ }
+ } else if (dto instanceof List && !((List>) dto).isEmpty()) {
+ firstRow = ((List>) dto).get(0);
+ rowType = firstRow.getClass();
+ } else {
+ rowType = dto.getClass();
+ }
+
+ try {
+ // Check top-level DTO first
+ extractFieldToUnits(dto, fieldToUnits);
+
+ // If it's a list or CwmsCsv, also check the row object for @CsvUnitHeader
+ if (firstRow != null) {
+ extractFieldToUnits(firstRow, fieldToUnits);
+ }
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new FormattingException("Error extracting units for CSV header", e);
+ }
+
+ if (rowType != null) {
+ List columns = new ArrayList<>();
+ for (Field f : getAllFields(rowType)) {
+ JsonProperty jp = f.getAnnotation(JsonProperty.class);
+ if (jp != null && jp.index() != JsonProperty.INDEX_UNKNOWN) {
+ if (!includeOptionalColumns && !f.isAnnotationPresent(CsvRequiredColumn.class)) {
+ continue;
+ }
+ String name = resolvePropertyName(f);
+ String units = fieldToUnits.get(name);
+ if (units != null) {
+ name = name + " (" + units + ")";
+ }
+ columns.add(new ColumnInfo(name, jp.index()));
+ }
+ }
+ columns.sort(Comparator.comparingInt(c -> c.order));
+ for (int i = 0; i < columns.size(); i++) {
+ sb.append(columns.get(i).name);
+ if (i < columns.size() - 1) {
+ sb.append(",");
+ }
+ }
+ sb.append("\n");
+ }
+
+ return sb.toString();
+ }
+
+ public static String stripHeader(String csv) {
+ StringBuilder sb = new StringBuilder();
+ String[] lines = csv.split("\\r?\\n");
+ boolean headerFound = false;
+ for (String line : lines) {
+ if (!headerFound) {
+ if (!line.startsWith("#")) {
+ headerFound = true; // first non-comment line is the header
+ }
+ continue; //continue until we find the header, skipping all comment lines, and also skipping the header line itself
+ }
+ sb.append(line).append(System.lineSeparator());
+ }
+ return sb.toString();
+ }
+
+ private static void extractFieldToUnits(Object dto, Map fieldToUnits) throws IllegalAccessException, InvocationTargetException {
+ if (dto == null) return;
+ Class> cls = dto.getClass();
+ List fields = getAllFields(cls);
+ for (Field f : fields) {
+ CsvUnitHeader ann = f.getAnnotation(CsvUnitHeader.class);
+ if (ann != null) {
+ f.setAccessible(true);
+ Object val = f.get(dto);
+ if (val != null) {
+ String unitVal = val.toString();
+ if (!unitVal.isEmpty()) {
+ fieldToUnits.put(ann.field(), unitVal);
+ }
+ }
+ }
+ }
+ List methods = getAllMethods(cls);
+ for (Method m : methods) {
+ CsvUnitHeader ann = m.getAnnotation(CsvUnitHeader.class);
+ if (ann != null && m.getParameterCount() == 0) {
+ m.setAccessible(true);
+ Object val = m.invoke(dto);
+ if (val != null) {
+ String unitVal = val.toString();
+ if (!unitVal.isEmpty()) {
+ fieldToUnits.put(ann.field(), unitVal);
+ }
+ }
+ }
+ }
+ }
+
+ static T parseCwmsCsv(String content, Class type) {
+ try {
+ T dto;
+ try {
+ dto = type.getDeclaredConstructor().newInstance();
+ } catch (NoSuchMethodException e) {
+ // No default constructor, try to find a Builder
+ Class> builderClass = null;
+ for (Class> inner : type.getDeclaredClasses()) {
+ if (inner.getSimpleName().equals("Builder")) {
+ builderClass = inner;
+ break;
+ }
+ }
+ if (builderClass != null) {
+ Object builder = builderClass.getDeclaredConstructor().newInstance();
+ Method buildMethod = builderClass.getDeclaredMethod("build");
+ dto = type.cast(buildMethod.invoke(builder));
+ } else {
+ throw e;
+ }
+ }
+
+ if (dto instanceof CwmsCsvDTO) {
+ Map metadata = CwmsCsvProcessor.parseMetadata(content);
+ String units = CwmsCsvProcessor.parseUnits(content);
+ CwmsCsvProcessor.injectMetadataAndUnits(content, type, dto, metadata, units);
+ return dto;
+ }
+ } catch (Exception e) {
+ throw new FormattingException("Could not parse " + type.getName(), e);
+ }
+ throw new FormattingException("Could not parse " + type.getName() + ". Must be a " + CwmsCsvDTO.class.getName());
+ }
+
+ private static void injectMetadataAndUnits(String content, Class type, T dto, Map metadata, String units) throws IOException, IllegalAccessException {
+ // Inject metadata into DTO
+ CwmsCsvProcessor.applyMetadataAndUnits(dto, metadata, units);
+
+ Field rowsField = getRowsField(type);
+
+ if (rowsField != null) {
+ rowsField.setAccessible(true);
+ Class> rowType = (Class>) ((ParameterizedType) rowsField.getGenericType()).getActualTypeArguments()[0];
+ List> rows = parseRows(content, rowType);
+
+ if (units != null) {
+ for (Object row : rows) {
+ CwmsCsvProcessor.applyMetadataAndUnits(row, metadata, units);
+ }
+ }
+
+ rowsField.set(dto, rows);
+ }
+ }
+
+ private static List> parseRows(String content, Class> csvRowDtoType) throws IOException {
+ CsvMapper csvMapper = buildObjectMapper(true, csvRowDtoType);
+ csvMapper.enable(CsvParser.Feature.ALLOW_COMMENTS);
+ csvMapper.enable(CsvParser.Feature.SKIP_EMPTY_LINES);
+ csvMapper.enable(CsvParser.Feature.TRIM_SPACES);
+ CsvSchema schema = csvMapper.schemaFor(csvRowDtoType).withHeader();
+ try (MappingIterator> it = csvMapper.readerFor(csvRowDtoType).with(schema).readValues(content)) {
+ return it.readAll();
+ }
+ }
+
+ static String formatCwmsCsv(CwmsCsvDTO> dto, boolean includeOptionalColumns, boolean includeMetadata) throws JsonProcessingException {
+ StringBuilder sb = new StringBuilder();
+ if (includeMetadata) {
+ sb.append(CwmsCsvProcessor.buildMetadataComments(dto));
+ }
+ List> rows = dto.getRows();
+ if (rows != null && !rows.isEmpty()) {
+ Object firstRow = rows.get(0);
+ CsvMapper csvMapper = CwmsCsvProcessor.buildObjectMapper(includeOptionalColumns, firstRow.getClass());
+ CsvSchema schema = csvMapper.schemaFor(firstRow.getClass()).withoutHeader();
+ String header = CwmsCsvProcessor.buildHeader(dto, includeOptionalColumns);
+ sb.append(header);
+ sb.append(csvMapper.writer(schema).writeValueAsString(rows));
+ }
+ return sb.toString();
+ }
+
+ private static CsvMapper buildObjectMapper(boolean includeOptionalColumns, Class> rowType) {
+ CsvMapper mapper = new CsvMapper();
+
+ mapper.findAndRegisterModules();
+ // Without these two disables an Instant gets written as 3333333.335000000
+ mapper.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS);
+ mapper.disable(DeserializationFeature.READ_DATE_TIMESTAMPS_AS_NANOSECONDS);
+ mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
+ mapper.disable(CsvGenerator.Feature.ALWAYS_QUOTE_STRINGS);
+ mapper.enable(CsvGenerator.Feature.STRICT_CHECK_FOR_QUOTING);
+
+ mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
+ mapper.registerModule(new JavaTimeModule());
+ mapper.registerModule(new Jdk8Module());
+ mapper.enable(CsvParser.Feature.ALLOW_COMMENTS);
+ SimpleModule module = new SimpleModule();
+ module.addDeserializer(ZoneId.class, new ZoneIdDeserializer());
+ mapper.registerModule(module);
+ if(!includeOptionalColumns && rowType != null) {
+ registerIgnoreOptionalColumns(rowType, mapper);
+ }
+ return mapper;
+ }
+
+ private static void registerIgnoreOptionalColumns(Class> rowType, CsvMapper mapper) {
+ Set ignoredFields = new HashSet<>();
+ List fields = CwmsCsvProcessor.getAllFields(rowType);
+ for (Field f : fields) {
+ JsonProperty jp = f.getAnnotation(JsonProperty.class);
+ if (jp != null && jp.index() != JsonProperty.INDEX_UNKNOWN) {
+ if (!f.isAnnotationPresent(CsvRequiredColumn.class)) {
+ ignoredFields.add(f.getName());
+ }
+ }
+ }
+ if (!ignoredFields.isEmpty()) {
+ mapper.addMixIn(rowType, PropertyFilterMixIn.class);
+ mapper.setAnnotationIntrospector(new JacksonAnnotationIntrospector() {
+ @Override
+ public JsonIgnoreProperties.Value findPropertyIgnoralByName(MapperConfig> config, Annotated ac) {
+ if (ac.getRawType().equals(rowType)) {
+ return JsonIgnoreProperties.Value.forIgnoreUnknown(true).withIgnored(ignoredFields);
+ }
+ return super.findPropertyIgnoralByName(null, ac);
+ }
+ });
+ }
+ }
+
+ @JsonIgnoreProperties(ignoreUnknown = true)
+ abstract static class PropertyFilterMixIn {}
+
+ private static @Nullable Field getRowsField(Class type) {
+ // Find rows field
+ Field rowsField = null;
+ for (Field f : CwmsCsvProcessor.getAllFields(type)) {
+ if (f.isAnnotationPresent(CsvRows.class)) {
+ rowsField = f;
+ break;
+ }
+ }
+ return rowsField;
+ }
+
+ private static List getAllMethods(Class> cls) {
+ List methods = new ArrayList<>();
+ Class> current = cls;
+ while (current != null && current != Object.class) {
+ methods.addAll(Arrays.asList(current.getDeclaredMethods()));
+ current = current.getSuperclass();
+ }
+ return methods;
+ }
+
+ private static String resolvePropertyName(Field f) {
+ JsonProperty jp = f.getAnnotation(JsonProperty.class);
+ if (jp != null && !jp.value().isEmpty()) {
+ return jp.value();
+ }
+ JsonNaming naming = f.getDeclaringClass().getAnnotation(JsonNaming.class);
+ return getName(f, naming);
+ }
+
+ private static String getName(Field f, JsonNaming naming) {
+ if (naming != null) {
+ try {
+ Object strat = naming.value().getDeclaredConstructor().newInstance();
+ if (strat instanceof PropertyNamingStrategies.NamingBase) {
+ return ((PropertyNamingStrategies.NamingBase) strat).translate(f.getName());
+ }
+ } catch (InvocationTargetException | InstantiationException | IllegalAccessException | NoSuchMethodException e) {
+ throw new FormattingException("Error resolving property name for " + f.getName(), e);
+ }
+ }
+ return f.getName();
+ }
+
+ private static String resolveKeyName(Field f, Class> owner) {
+ JsonProperty jp = f.getAnnotation(JsonProperty.class);
+ if (jp != null && !jp.value().isEmpty()) {
+ return jp.value();
+ }
+ JsonNaming naming = owner.getAnnotation(JsonNaming.class);
+ return getName(f, naming);
+ }
+
+ private static Object convertToType(String val, Class> type) {
+ if (type == String.class) return val;
+ if (type == Instant.class) return Instant.parse(val);
+ if (type == Integer.class || type == int.class) return Integer.parseInt(val);
+ if (type == Double.class || type == double.class) return Double.parseDouble(val);
+ if (type == Long.class || type == long.class) return Long.parseLong(val);
+ if (type == Boolean.class || type == boolean.class) return Boolean.parseBoolean(val);
+ return null;
+ }
+
+ private static void applyMetadataAndUnits(Object dto, Map metadata, String units) {
+ if (dto == null) return;
+ Class> cls = dto.getClass();
+ // Use a list of all fields including superclasses
+ List allFields = CwmsCsvProcessor.getAllFields(cls);
+
+ for (Field f : allFields) {
+ f.setAccessible(true);
+ try {
+ // Handle Units via @CsvUnitHeader
+ if (units != null) {
+ CsvUnitHeader unitAnn = f.getAnnotation(CsvUnitHeader.class);
+ if (unitAnn != null) {
+ f.set(dto, units);
+ }
+ }
+
+ // Handle Metadata
+ String key = CwmsCsvProcessor.resolveKeyName(f, cls);
+ String val = metadata.get(key);
+ if (val != null) {
+ Object converted = CwmsCsvProcessor.convertToType(val, f.getType());
+ if (converted != null) {
+ f.set(dto, converted);
+ }
+ }
+ } catch (IllegalAccessException e) {
+ throw new FormattingException("Error applying metadata to field " + f.getName(), e);
+ }
+ }
+ }
+
+ private static Map parseMetadata(String content) {
+ Map metadata = new HashMap<>();
+ String[] lines = content.split("\\r?\\n");
+ for (String line : lines) {
+ String trimmed = line.trim();
+ if (trimmed.startsWith("#")) {
+ String comment = trimmed.substring(1).trim();
+ int colon = comment.indexOf(':');
+ if (colon != -1) {
+ String key = comment.substring(0, colon).trim();
+ String val = comment.substring(colon + 1).trim();
+ metadata.put(key, val);
+ }
+ } else if (!trimmed.isEmpty()) {
+ break; // Header or data starts
+ }
+ }
+ return metadata;
+ }
+
+ private static String parseUnits(String content) {
+ String[] lines = content.split("\\r?\\n");
+ for (String line : lines) {
+ String trimmed = line.trim();
+ if (trimmed.startsWith("#")) continue;
+ if (trimmed.isEmpty()) continue;
+ // First non-comment line is header
+ int start = trimmed.indexOf('(');
+ int end = trimmed.indexOf(')');
+ if (start != -1 && end != -1 && end > start) {
+ return trimmed.substring(start + 1, end);
+ }
+ break;
+ }
+ return null;
+ }
+
+ private static class ColumnInfo {
+ final String name;
+ final int order;
+ ColumnInfo(String name, int order) {
+ this.name = name;
+ this.order = order;
+ }
+ }
+
+}
diff --git a/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java b/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java
index 2091c3cf78..1726febd4f 100644
--- a/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java
+++ b/cwms-data-api/src/main/java/cwms/cda/formatters/json/adapters/TimeSeriesRecordSerializer.java
@@ -30,6 +30,7 @@
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.SerializerProvider;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+import com.fasterxml.jackson.dataformat.csv.CsvGenerator;
import com.fasterxml.jackson.dataformat.xml.ser.XmlSerializerProvider;
import cwms.cda.data.dto.TimeSeries;
import java.io.IOException;
@@ -44,6 +45,7 @@ public TimeSeriesRecordSerializer() {
public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, SerializerProvider provider)
throws IOException {
+
if (provider instanceof XmlSerializerProvider) {
// Handle XML serialization
@@ -54,7 +56,9 @@ public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, Serializ
} else {
gen.writeNumberField("value", recordValue.getValue());
}
- gen.writeNumberField("quality-code", recordValue.getQualityCode());
+ if(!(gen instanceof CsvGenerator)) {
+ gen.writeNumberField("quality-code", recordValue.getQualityCode());
+ }
gen.writeEndObject();
} else {
// Handle JSON serialization
@@ -66,12 +70,14 @@ public void serialize(TimeSeries.Record recordValue, JsonGenerator gen, Serializ
} else {
gen.writeNumber(recordValue.getValue());
}
- gen.writeNumber(recordValue.getQualityCode());
- // Used to include the dataEntryDate in the serialized output if requested. Modifies length of the output array.
- // If the dataEntryDate is requested, it will always be non-null
- // Without the dataEntryDate, the array will have 3 elements: [dateTime, value, qualityCode]
- if (recordValue.getDataEntryDate() != null) {
- gen.writeNumber(recordValue.getDataEntryDate().getTime());
+ if(!(gen instanceof CsvGenerator)) {
+ gen.writeNumber(recordValue.getQualityCode());
+ // Used to include the dataEntryDate in the serialized output if requested. Modifies length of the output array.
+ // If the dataEntryDate is requested, it will always be non-null
+ // Without the dataEntryDate, the array will have 3 elements: [dateTime, value, qualityCode]
+ if (recordValue.getDataEntryDate() != null) {
+ gen.writeNumber(recordValue.getDataEntryDate().getTime());
+ }
}
gen.writeEndArray();
}
diff --git a/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java b/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java
index a4bf226dc0..423755586b 100644
--- a/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java
+++ b/cwms-data-api/src/test/java/cwms/cda/api/TimeseriesControllerTestIT.java
@@ -26,6 +26,8 @@
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -62,7 +64,6 @@
import javax.servlet.http.HttpServletResponse;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
-import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.time.ZonedDateTime;
import java.util.ArrayList;
@@ -1826,6 +1827,323 @@ void test_wrong_units() throws Exception {
;
}
+ @Test
+ void test_csv_default_columns_with_metadata() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+
+ InputStream resource = this.getClass().getResourceAsStream(
+ "/cwms/cda/api/lrl/1day_offset.json");
+ assertNotNull(resource);
+ String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8);
+
+ JsonNode ts = mapper.readTree(tsData);
+ String location = ts.get(NAME).asText().split("\\.")[0];
+ String officeId = ts.get("office-id").asText();
+
+ createLocation(location, true, officeId);
+
+ TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL;
+ String firstPoint = "2023-02-02T06:00:00-05:00";
+ // insert
+ given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.JSONV2)
+ .contentType(Formats.JSONV2)
+ .body(tsData)
+ .header("Authorization",user.toHeaderValue())
+ .queryParam(OFFICE,officeId)
+ .when()
+ .post("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL,true)
+ .statusCode(is(HttpServletResponse.SC_OK));
+
+ // retrieve CSV v2 with metadata comments explicitly requested
+ String body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.BEGIN, firstPoint)
+ .queryParam(Controllers.END, firstPoint)
+ .queryParam(Controllers.INCLUDE_METADATA_AS_CSV_COMMENTS, true)
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL,true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .extract().asString();
+
+ assertNotNull(body);
+ String normalized = body.replace("\r", "");
+ assertTrue(normalized.startsWith("# "), "Expected metadata comments");
+ assertTrue(normalized.contains("date-time,value (F)\n"),
+ "Expected header 'date-time,value (F)' but was: " + normalized.split("\n")[0]);
+ }
+
+
+ @Test
+ void test_csv_optional_columns_no_metadata() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ InputStream resource = this.getClass().getResourceAsStream(
+ "/cwms/cda/api/lrl/1day_offset.json");
+ assertNotNull(resource);
+ String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8);
+
+ JsonNode ts = mapper.readTree(tsData);
+ String location = ts.get(NAME).asText().split("\\.")[0];
+ String officeId = ts.get("office-id").asText();
+
+ createLocation(location, true, officeId);
+ TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL;
+ String firstPoint = "2023-02-02T06:00:00-05:00";
+ given()
+ .accept(Formats.JSONV2)
+ .contentType(Formats.JSONV2)
+ .body(tsData)
+ .header("Authorization",user.toHeaderValue())
+ .queryParam(OFFICE,officeId)
+ .when()
+ .post("/timeseries/")
+ .then()
+ .statusCode(is(HttpServletResponse.SC_OK));
+
+ String body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.BEGIN, firstPoint)
+ .queryParam(Controllers.END, firstPoint)
+ .queryParam("include-optional-csv-columns", true)
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL,true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .extract().asString();
+
+ assertNotNull(body);
+ String normalized = body.replace("\r", "");
+ assertTrue(normalized.contains("date-time,value (F),data-entry-date,quality-code"),
+ "Expected header with optional columns but was: " + normalized.split("\n")[0]);
+ }
+
+ @Test
+ void test_csv_optional_columns_with_metadata() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ InputStream resource = this.getClass().getResourceAsStream(
+ "/cwms/cda/api/lrl/1day_offset.json");
+ assertNotNull(resource);
+ String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8);
+
+ JsonNode ts = mapper.readTree(tsData);
+ String location = ts.get(NAME).asText().split("\\.")[0];
+ String officeId = ts.get("office-id").asText();
+
+ createLocation(location, true, officeId);
+ TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL;
+ String firstPoint = "2023-02-02T06:00:00-05:00";
+ given()
+ .accept(Formats.JSONV2)
+ .contentType(Formats.JSONV2)
+ .body(tsData)
+ .header("Authorization",user.toHeaderValue())
+ .queryParam(OFFICE,officeId)
+ .when()
+ .post("/timeseries/")
+ .then()
+ .statusCode(is(HttpServletResponse.SC_OK));
+
+ String body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.BEGIN, firstPoint)
+ .queryParam(Controllers.END, firstPoint)
+ .queryParam("include-metadata-as-comments", true)
+ .queryParam("include-optional-csv-columns", true)
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL,true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .extract().asString();
+
+ assertNotNull(body);
+ String normalized = body.replace("\r", "");
+ assertTrue(normalized.startsWith("# time-series-id: "), "Expected metadata comments");
+ assertTrue(normalized.contains("date-time,value (F),data-entry-date,quality-code"),
+ "Expected header with optional columns");
+ }
+
+ @Test
+ void test_csv_streaming() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ InputStream resource = this.getClass().getResourceAsStream(
+ "/cwms/cda/api/lrl/large_timeseries.json");
+ assertNotNull(resource);
+ String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8);
+
+ JsonNode ts = mapper.readTree(tsData);
+ String location = ts.get(NAME).asText().split("\\.")[0];
+ String officeId = ts.get("office-id").asText();
+
+ createLocation(location, true, officeId);
+ TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL;
+
+ // 1. Store the large dataset
+ given()
+ .accept(Formats.JSONV2)
+ .contentType(Formats.JSONV2)
+ .body(tsData)
+ .header("Authorization", user.toHeaderValue())
+ .queryParam(OFFICE, officeId)
+ .when()
+ .post("/timeseries/")
+ .then()
+ .statusCode(is(HttpServletResponse.SC_OK));
+
+ // 2. Client requests CSV and receives a single streamed response.
+ // The small page-size (10) forces the server to perform 5 internal fetch cycles
+ // for the 50 points in large_timeseries.json.
+ String body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.PAGE_SIZE, 10)
+ .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z")
+ .queryParam(Controllers.END, "2023-01-13T13:00:00Z")
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .header("X-Stream-Batch-Size", "10")
+ .extract().asString();
+
+ // 3. Verify total data integrity
+ assertNotNull(body);
+ String[] lines = body.replace("\r", "").split("\n");
+ // 50 points + 1 header = 51 lines
+ assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)");
+ assertEquals("date-time,value (cfs)", lines[0]);
+
+ // Verify first and last data points
+ assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0"));
+ assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0"));
+
+ // 4. Client requests CSV and receives a single streamed response.
+ // The small page-size (6) forces the server to perform 9 internal fetch cycles
+ // for the 50 points in large_timeseries.json.
+ body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.PAGE_SIZE, 6)
+ .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z")
+ .queryParam(Controllers.END, "2023-01-13T13:00:00Z")
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .header("X-Stream-Batch-Size", "6")
+ .extract().asString();
+
+ // 5. Verify total data integrity
+ assertNotNull(body);
+ lines = body.replace("\r", "").split("\n");
+ // 50 points + 1 header = 51 lines
+ assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)");
+ assertEquals("date-time,value (cfs)", lines[0]);
+
+ // Verify first and last data points
+ assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0"));
+ assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0"));
+
+ // 6. Client requests CSV and receives a single streamed response.
+ // The large page-size (60) forces the server to perform 1 internal fetch cycles
+ // for the 50 points in large_timeseries.json.
+ body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.PAGE_SIZE, 60)
+ .queryParam(Controllers.BEGIN, "2023-01-11T12:00:00Z")
+ .queryParam(Controllers.END, "2023-01-13T13:00:00Z")
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .header("X-Stream-Batch-Size", "60")
+ .extract().asString();
+
+ // 7. Verify total data integrity
+ assertNotNull(body);
+ lines = body.replace("\r", "").split("\n");
+ // 50 points + 1 header = 51 lines
+ assertEquals(51, lines.length, "Expected 51 lines (1 header + 50 points)");
+ assertEquals("date-time,value (cfs)", lines[0]);
+
+ // Verify first and last data points
+ assertTrue(lines[1].startsWith("2023-01-11T12:00:00Z,10.0"));
+ assertTrue(lines[50].startsWith("2023-01-13T13:00:00Z,500.0"));
+ }
+
+ @Test
+ void test_csv_default_columns_no_metadata() throws Exception {
+ ObjectMapper mapper = new ObjectMapper();
+ InputStream resource = this.getClass().getResourceAsStream(
+ "/cwms/cda/api/lrl/1day_offset.json");
+ assertNotNull(resource);
+ String tsData = IOUtils.toString(resource, StandardCharsets.UTF_8);
+
+ JsonNode ts = mapper.readTree(tsData);
+ String location = ts.get(NAME).asText().split("\\.")[0];
+ String officeId = ts.get("office-id").asText();
+
+ createLocation(location, true, officeId);
+ TestAccounts.KeyUser user = TestAccounts.KeyUser.SPK_NORMAL;
+ String firstPoint = "2023-02-02T06:00:00-05:00";
+
+ given()
+ .accept(Formats.JSONV2)
+ .contentType(Formats.JSONV2)
+ .body(tsData)
+ .header("Authorization",user.toHeaderValue())
+ .queryParam(OFFICE,officeId)
+ .when()
+ .post("/timeseries/")
+ .then()
+ .statusCode(is(HttpServletResponse.SC_OK));
+
+ String body = given()
+ .log().ifValidationFails(LogDetail.ALL, true)
+ .accept(Formats.CSV)
+ .queryParam(Controllers.OFFICE, officeId)
+ .queryParam(Controllers.NAME, ts.get(Controllers.NAME).asText())
+ .queryParam(Controllers.BEGIN, firstPoint)
+ .queryParam(Controllers.END, firstPoint)
+ .when()
+ .get("/timeseries/")
+ .then()
+ .log().ifValidationFails(LogDetail.ALL,true)
+ .statusCode(is(HttpServletResponse.SC_OK))
+ .extract().asString();
+
+ assertNotNull(body);
+ String normalized = body.replace("\r", "");
+ assertFalse(normalized.startsWith("# "), "Expected no metadata comments");
+ assertTrue(normalized.startsWith("date-time,value (F)\n"),
+ "Expected default columns header but was: " + normalized.split("\n")[0]);
+ }
+
enum GetAllTest
{
DEFAULT(Formats.DEFAULT, Formats.JSONV2),
diff --git a/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java b/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java
index 51e866aeff..62c337ef01 100644
--- a/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java
+++ b/cwms-data-api/src/test/java/cwms/cda/data/dto/TimeSeriesTest.java
@@ -10,6 +10,7 @@
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
+import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.temporal.ChronoUnit;
@@ -21,6 +22,7 @@
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import cwms.cda.formatters.xml.XMLv2;
+import cwms.cda.formatters.csv.CsvV1;
import java.util.List;
diff --git a/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java b/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java
new file mode 100644
index 0000000000..60d735ba9a
--- /dev/null
+++ b/cwms-data-api/src/test/java/cwms/cda/data/dto/csv/TestTimeSeriesCsvRow.java
@@ -0,0 +1,197 @@
+package cwms.cda.data.dto.csv;
+
+import cwms.cda.formatters.csv.CsvV1;
+import org.apache.commons.io.IOUtils;
+import org.junit.jupiter.api.Test;
+
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.Collections;
+import java.util.List;
+
+import static cwms.cda.helpers.DTOMatch.assertMatch;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestTimeSeriesCsvRow {
+
+ @Test
+ void testSingleRow_Default_Columns_NoMetadata() throws Exception {
+ TimeSeriesCsvRow row = buildRow(Instant.parse("2021-06-21T21:06:00Z"), 1.0, "ft");
+ TimeSeriesCsv container = new TimeSeriesCsv.Builder()
+ .withRows(Collections.singletonList(row))
+ .build();
+
+ CsvV1 csv = new CsvV1();
+ String actual = csv.format(container);
+ assertNotNull(actual);
+ String normalized = normalize(actual);
+ assertTrue(normalized.contains("date-time,value (ft)"), "Header mismatch");
+ assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0"), "Row mismatch");
+ }
+
+ @Test
+ void testSingleRow_Default_Columns_WithMetadataComments() throws Exception {
+ TimeSeriesCsvRow row = buildRow(Instant.parse("2021-06-21T21:06:00Z"), 1.0, "ft");
+ TimeSeriesCsv container = new TimeSeriesCsv.Builder()
+ .withTimeSeriesId("RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST")
+ .withOfficeId("SPK")
+ .withVersionDate("2025-07-22T14:00:00Z")
+ .withRows(Collections.singletonList(row))
+ .build();
+
+ CsvV1 csv = new CsvV1();
+ String actual = csv.formatWithMetadata(container, false);
+ assertNotNull(actual);
+ String normalized = normalize(actual);
+ assertTrue(normalized.contains("# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST"), "Metadata mismatch");
+ assertTrue(normalized.contains("# office-id: SPK"), "Metadata mismatch");
+ assertTrue(normalized.contains("date-time,value (ft)"), "Header mismatch");
+ assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0"), "Row mismatch");
+ }
+
+ @Test
+ void testSingleRow_WithOptionalColumns() throws Exception {
+ TimeSeriesCsvRow row = new TimeSeriesCsvRow.Builder()
+ .withDateTime(Instant.parse("2021-06-21T21:06:00Z"))
+ .withValue(1.0)
+ .withQualityCode(0)
+ .withDataEntryDate(Instant.parse("2021-06-21T21:00:00Z"))
+ .withUnits("ft")
+ .build();
+ TimeSeriesCsv container = new TimeSeriesCsv.Builder()
+ .withRows(Collections.singletonList(row))
+ .build();
+
+ CsvV1 csv = new CsvV1();
+ String actual = csv.format(container, true);
+ assertNotNull(actual);
+ String normalized = normalize(actual);
+ assertTrue(normalized.contains("date-time,value (ft),data-entry-date,quality-code"), "Header mismatch: " + normalized);
+ assertTrue(normalized.contains("2021-06-21T21:06:00Z,1.0,2021-06-21T21:00:00Z,0"), "Row mismatch: " + normalized);
+ }
+
+ @Test
+ void testDefaultSerialization() throws Exception {
+ String csv = readResource("cwms/cda/data/dto/time-series-default.csv");
+ CsvV1 formatter = new CsvV1();
+ TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class);
+ assertNotNull(container);
+ List rows = container.getRows();
+ assertEquals(4, rows.size());
+ assertEquals("cfs", rows.get(0).getUnits());
+ assertEquals(0.0, rows.get(0).getValue());
+ assertEquals(Instant.parse("2021-06-21T00:00:00Z"), rows.get(0).getDateTime());
+
+ //serialize back
+ String serialized = formatter.format(container);
+ assertNotNull(serialized);
+
+ //parse back
+ TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class);
+ assertNotNull(parsedContainer);
+ assertEquals(rows.get(0).getUnits(), parsedContainer.getRows().get(0).getUnits());
+ assertMatch(rows, parsedContainer.getRows());
+ }
+
+ @Test
+ void testMetadataAsCommentsSerialization() throws Exception {
+ String csv = readResource("cwms/cda/data/dto/time-series-metadata-comments.csv");
+ CsvV1 formatter = new CsvV1();
+ TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class);
+ assertNotNull(container);
+ List rows = container.getRows();
+ assertEquals(4, rows.size());
+ assertEquals("cfs", rows.get(0).getUnits());
+ assertEquals(0.0, rows.get(0).getValue());
+ assertEquals("ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI", container.getTimeSeriesId());
+ assertEquals("SWT", container.getOfficeId());
+ assertEquals("aggregate", container.getVersionDate());
+
+ //serialize back
+ String serialized = formatter.formatWithMetadata(container, false);
+ assertNotNull(serialized);
+
+ //parse back
+ TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class);
+ assertNotNull(parsedContainer);
+ assertEquals(container.getTimeSeriesId(), parsedContainer.getTimeSeriesId());
+ assertEquals(container.getOfficeId(), parsedContainer.getOfficeId());
+ assertEquals(container.getVersionDate(), parsedContainer.getVersionDate());
+ assertMatch(rows, parsedContainer.getRows());
+ }
+
+ @Test
+ void testOptionalsNoMetadataSerialization() throws Exception {
+ String csv = readResource("cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv");
+ CsvV1 formatter = new CsvV1();
+ TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class);
+ assertNotNull(container);
+ List rows = container.getRows();
+ assertEquals(4, rows.size());
+ assertEquals("cfs", rows.get(0).getUnits());
+ assertEquals(0.0, rows.get(0).getValue());
+ assertNotNull(rows.get(0).getDataEntryDate());
+ assertEquals(5, rows.get(0).getQualityCode());
+
+ //serialize back
+ String serialized = formatter.format(container, true);
+ assertNotNull(serialized);
+
+ //parse back
+ TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class);
+ assertNotNull(parsedContainer);
+ assertMatch(rows, parsedContainer.getRows());
+ }
+
+ @Test
+ void testOptionalsWithMetadataCommentsSerialization() throws Exception {
+ String csv = readResource("cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv");
+ CsvV1 formatter = new CsvV1();
+ TimeSeriesCsv container = formatter.parseContent(csv, TimeSeriesCsv.class);
+ assertNotNull(container);
+ List rows = container.getRows();
+ assertEquals(4, rows.size());
+ assertEquals("cfs", rows.get(0).getUnits());
+ assertEquals(0.0, rows.get(0).getValue());
+ assertEquals("ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI", container.getTimeSeriesId());
+ assertEquals("SWT", container.getOfficeId());
+ assertEquals("aggregate", container.getVersionDate());
+ assertNotNull(rows.get(0).getDataEntryDate());
+ assertEquals(5, rows.get(0).getQualityCode());
+
+ //serialize back
+ String serialized = formatter.formatWithMetadata(container, true);
+ assertNotNull(serialized);
+
+ //parse back
+ TimeSeriesCsv parsedContainer = formatter.parseContent(serialized, TimeSeriesCsv.class);
+ assertNotNull(parsedContainer);
+ assertEquals(container.getTimeSeriesId(), parsedContainer.getTimeSeriesId());
+ assertEquals(container.getOfficeId(), parsedContainer.getOfficeId());
+ assertEquals(container.getVersionDate(), parsedContainer.getVersionDate());
+ assertMatch(rows, parsedContainer.getRows());
+ }
+
+ private static String readResource(String path) throws Exception {
+ InputStream stream = TestTimeSeriesCsvRow.class.getClassLoader().getResourceAsStream(path);
+ assertNotNull(stream, "Missing test resource: " + path);
+ String expected = IOUtils.toString(stream, StandardCharsets.UTF_8);
+ return normalize(expected);
+ }
+
+ private static String normalize(String s) {
+ return s.replaceAll("\r", "");
+ }
+
+ private static TimeSeriesCsvRow buildRow(Instant dateTime, Double value, String units) {
+ return new TimeSeriesCsvRow.Builder()
+ .withDateTime(dateTime)
+ .withValue(value)
+ .withQualityCode(0)
+ .withUnits(units)
+ .build();
+ }
+}
diff --git a/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java b/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java
index 60067d5afb..1fa34a67c4 100644
--- a/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java
+++ b/cwms-data-api/src/test/java/cwms/cda/helpers/DTOMatch.java
@@ -30,6 +30,8 @@
import cwms.cda.data.dto.TimeExtents;
import cwms.cda.data.dto.TimeSeriesExtents;
import cwms.cda.data.dto.catalog.LocationAlias;
+import cwms.cda.data.dto.csv.TimeSeriesCsv;
+import cwms.cda.data.dto.csv.TimeSeriesCsvRow;
import cwms.cda.data.dto.location.kind.Lock;
import cwms.cda.data.dto.CwmsDTOBase;
import cwms.cda.data.dto.location.kind.GateChange;
@@ -403,6 +405,29 @@ public static void assertMatch(WaterSupplyAccounting first, WaterSupplyAccountin
);
}
+ public static void assertMatch(TimeSeriesCsv first, TimeSeriesCsv second) {
+ assertAll(
+ () -> assertEquals(first.getTimeSeriesId(), second.getTimeSeriesId(), "Time series IDs do not match"),
+ () -> assertEquals(first.getOfficeId(), second.getOfficeId(), "Office IDs do not match"),
+ () -> assertEquals(first.getVersionDate(), second.getVersionDate(), "Version dates do not match"),
+ () -> assertMatch(first.getRows(), second.getRows())
+ );
+ }
+
+ public static void assertMatch(List first, List second) {
+ assertMatch(first, second, DTOMatch::assertMatch);
+ }
+
+ public static void assertMatch(TimeSeriesCsvRow first, TimeSeriesCsvRow second) {
+ assertAll(
+ () -> assertEquals(first.getDateTime(), second.getDateTime(), "Date times do not match"),
+ () -> assertEquals(first.getDataEntryDate(), second.getDataEntryDate(), "Data entry dates do not match"),
+ () -> assertEquals(first.getUnits(), second.getUnits(), "Units do not match"),
+ () -> assertEquals(first.getValue(), second.getValue(), "Values do not match"),
+ () -> assertEquals(first.getQualityCode(), second.getQualityCode(), "Quality codes do not match")
+ );
+ }
+
public static void assertMatch(Map> first, Map> second) {
assertAll(
() -> assertEquals(first.size(), second.size(), "Pump accounting sizes do not match"),
diff --git a/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json b/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json
new file mode 100644
index 0000000000..cc3810420b
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/api/lrl/large_timeseries.json
@@ -0,0 +1,57 @@
+{
+ "name": "Calhoun.Flow.Inst.1Hour.0.cda-test",
+ "office-id": "SPK",
+ "units": "cfs",
+ "values": [
+ [1673438400000, 10, 0],
+ [1673442000000, 20, 0],
+ [1673445600000, 30, 0],
+ [1673449200000, 40, 0],
+ [1673452800000, 50, 0],
+ [1673456400000, 60, 0],
+ [1673460000000, 70, 0],
+ [1673463600000, 80, 0],
+ [1673467200000, 90, 0],
+ [1673470800000, 100, 0],
+ [1673474400000, 110, 0],
+ [1673478000000, 120, 0],
+ [1673481600000, 130, 0],
+ [1673485200000, 140, 0],
+ [1673488800000, 150, 0],
+ [1673492400000, 160, 0],
+ [1673496000000, 170, 0],
+ [1673499600000, 180, 0],
+ [1673503200000, 190, 0],
+ [1673506800000, 200, 0],
+ [1673510400000, 210, 0],
+ [1673514000000, 220, 0],
+ [1673517600000, 230, 0],
+ [1673521200000, 240, 0],
+ [1673524800000, 250, 0],
+ [1673528400000, 260, 0],
+ [1673532000000, 270, 0],
+ [1673535600000, 280, 0],
+ [1673539200000, 290, 0],
+ [1673542800000, 300, 0],
+ [1673546400000, 310, 0],
+ [1673550000000, 320, 0],
+ [1673553600000, 330, 0],
+ [1673557200000, 340, 0],
+ [1673560800000, 350, 0],
+ [1673564400000, 360, 0],
+ [1673568000000, 370, 0],
+ [1673571600000, 380, 0],
+ [1673575200000, 390, 0],
+ [1673578800000, 400, 0],
+ [1673582400000, 410, 0],
+ [1673586000000, 420, 0],
+ [1673589600000, 430, 0],
+ [1673593200000, 440, 0],
+ [1673596800000, 450, 0],
+ [1673600400000, 460, 0],
+ [1673604000000, 470, 0],
+ [1673607600000, 480, 0],
+ [1673611200000, 490, 0],
+ [1673614800000, 500, 0]
+ ]
+}
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv
new file mode 100644
index 0000000000..be1c3e6834
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-default.csv
@@ -0,0 +1,2 @@
+date-time,value
+2021-06-21T21:06:00Z,1.0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv
new file mode 100644
index 0000000000..a18823480e
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-columns.csv
@@ -0,0 +1,2 @@
+time-series-id,office-id,date-time,value,units,version-date,quality-code
+RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-21T21:06:00Z,1.0,ft,2025-07-22T14:00:00Z,0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv
new file mode 100644
index 0000000000..507c45924d
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-row-with-metadata-comment.csv
@@ -0,0 +1,8 @@
+# metadata-count: 5
+# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST
+# office-id: SPK
+# units: ft
+# version-date: 2025-07-22T14:00:00Z
+# quality-code: 0
+date-time,value
+2021-06-21T21:06:00Z,1.0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv
new file mode 100644
index 0000000000..eeaa5005da
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-columns.csv
@@ -0,0 +1,3 @@
+time-series-id,office-id,date-time,value,units,version-date,quality-code
+RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-21T21:06:00Z,1.0,ft,2025-07-22T14:00:00Z,0
+RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST,SPK,2021-06-22T21:06:00Z,2.0,ft,2025-07-22T14:00:00Z,0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv
new file mode 100644
index 0000000000..b1112566b8
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/csv/time-series-rows-with-metadata-comment.csv
@@ -0,0 +1,9 @@
+# metadata-count: 5
+# time-series-id: RYAN3.Stage.Inst.5Minutes.0.ZSTORE_TS_TEST
+# office-id: SPK
+# units: ft
+# version-date: 2025-07-22T14:00:00Z
+# quality-code: 0
+date-time,value
+2021-06-21T21:06:00Z,1.0
+2021-06-22T21:06:00Z,2.0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv
new file mode 100644
index 0000000000..75bdb32b51
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-default.csv
@@ -0,0 +1,5 @@
+date-time, value (cfs)
+2021-06-21T00:00:00Z, 0.0
+2021-06-22T00:00:00Z, 1.0
+2021-06-23T00:00:00Z, 2.0
+2021-06-24T00:00:00Z, 3.0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv
new file mode 100644
index 0000000000..50e2a69716
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-metadata-comments.csv
@@ -0,0 +1,8 @@
+# time-series-id: ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI
+# office-id: SWT
+# version-date: aggregate
+date-time, value (cfs)
+2021-06-21T00:00:00Z, 0.0
+2021-06-22T00:00:00Z, 1.0
+2021-06-23T00:00:00Z, 2.0
+2021-06-24T00:00:00Z, 3.0
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv
new file mode 100644
index 0000000000..c509376e00
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-no-metadata-comments.csv
@@ -0,0 +1,5 @@
+date-time, value (cfs), data-entry-date, quality-code
+2021-06-21T00:00:00Z, 0.0, 2021-06-21T00:05:00Z, 5
+2021-06-22T00:00:00Z, 1.0, 2021-06-22T00:05:00Z, 5
+2021-06-23T00:00:00Z, 2.0, 2021-06-23T00:05:00Z, 5
+2021-06-24T00:00:00Z, 3.0, 2021-06-24T00:05:00Z, 5
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv
new file mode 100644
index 0000000000..13ec27f224
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series-optionals-with-metadata-comments.csv
@@ -0,0 +1,9 @@
+# time-series-id: ALAT2.Flow-Out.Inst.1Hour.0.Rev-SWF-REGI
+# office-id: SWT
+# units: cfs
+# version-date: aggregate
+date-time,value (cfs),data-entry-date,quality-code
+2021-06-21T00:00:00Z,0.0,2021-06-21T00:05:00Z,5
+2021-06-21T01:00:00Z,10.0,2021-06-21T01:05:00Z,5
+2021-06-21T02:00:00Z,20.0,2021-06-21T02:05:00Z,5
+2021-06-21T03:00:00Z,30.0,2021-06-21T03:05:00Z,5
diff --git a/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv
new file mode 100644
index 0000000000..4819c27820
--- /dev/null
+++ b/cwms-data-api/src/test/resources/cwms/cda/data/dto/time-series.csv
@@ -0,0 +1,3 @@
+date-time,value
+1772826360000,12.34567
+1772826420000,12.34567
diff --git a/docs/source/decisions/csv-format.rst b/docs/source/decisions/csv-format.rst
new file mode 100644
index 0000000000..22b2d660bd
--- /dev/null
+++ b/docs/source/decisions/csv-format.rst
@@ -0,0 +1,59 @@
+#####
+CSV Format (With emphasis on TimeSeries)
+#####
+
+
+Summary
+=======
+
+This ADR defines a standardized CSV representation for the TimeSeries DTO with general considerations for future DTOs. It specifies a row-per-record CSV format that preserves essential metadata, flattens nested Record entries, and ensures consistent ingestion by analytics, automation, and warehousing systems.
+
+
+Opinions
+========
+
+Opinion 1
+---------
+
+@brysonspilman
+
+Summary:
+Define a CSV format for the TimeSeries DTO that flattens each Record into one row, repeats metadata per row, uses kebab-case headers, and applies consistent data formatting rules.
+
+The TimeSeries DTO currently supports JSON and XML, but various downstream users require CSV. Due to nested structures—a clear flattening strategy is necessary.
+
+Key points:
+
+- CSV will not be supported for DTOs with multiple independent data collections.
+- Timeseries CSV will include only the date-time and value Record fields (2 columns)
+- Metadata fields are repeated on each row to eliminate the need for joins.
+- Column names use kebab-case for consistency with JSON.
+- Record date-time values are serialized as Unix epoch milliseconds (UTC).
+- Null values are empty fields. Missing values use quality-code = 5.
+- data-entry-date is empty when not present.
+- UTF-8 encoding, comma delimiter, LF line endings, header included.
+- One row is produced per Record.
+- Multi-retrieve never includes multiple time-series IDs.
+
+Example CSV:
+
+date-time, value
+1624287600000, 0.0
+1624288500000, 1.0
+1624289400000, 2.0
+1624290300000, 3.0
+
+
+Decision Status
+===============
+
+(Status: proposed)
+
+
+References
+==========
+
+Related Types: cwms.cda.data.dto.TimeSeries, TimeSeries.Record
+Discussion: https://github.com/USACE/cwms-data-api/issues/1525#issuecomment-3974845633
+Date: 2026-02-26
+Owner: GEI – Bryson Spilman
\ No newline at end of file