Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cwms-data-api/src/main/java/cwms/cda/api/Controllers.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public final class Controllers {
public static final String CURSOR = "cursor";
public static final String PAGE = "page";
public static final String PAGE_SIZE = "page-size";
public static final String CSV_RESUME_FROM_PAGE = "csv-resume-from-page";

// IF the constant has a number at the end its a deprecated variant

Expand Down Expand Up @@ -127,6 +128,8 @@ public final class Controllers {
public static final String MATCH_NULL_PARENTS = "match-null-parents";
public static final String ENTITY_ID = "entity-id";
public static final String PARENT_ENTITY_ID = "parent-entity-id";
public static final String INCLUDE_METADATA_AS_CSV_COMMENTS = "include-metadata-as-comments";
public static final String INCLUDE_OPTIONAL_CSV_COLUMNS = "include-optional-csv-columns";

public static final String CREATE_AS_LRTS = "create-as-lrts";
public static final String STORE_RULE = "store-rule";
Expand Down
56 changes: 56 additions & 0 deletions cwms-data-api/src/main/java/cwms/cda/api/CursorIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package cwms.cda.api;

import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.function.Function;

public final class CursorIterator<P, T> implements Iterator<P> {

private final Function<String, P> fetcher;
private final Function<P, List<T>> itemsExtractor;
private final Function<P, String> nextCursorExtractor;

private String cursor;
private P nextPage;
private boolean initialized = false;

public CursorIterator(String initialCursor, Function<String, P> fetcher, Function<P, List<T>> itemsExtractor, Function<P, String> nextCursorExtractor) {
this.cursor = initialCursor == null ? "" : initialCursor;
this.fetcher = fetcher;
this.itemsExtractor = itemsExtractor;
this.nextCursorExtractor = nextCursorExtractor;
}

@Override
public boolean hasNext() {
if (!initialized) {
nextPage = fetcher.apply(cursor);
initialized = true;
}

if (nextPage == null) return false;

List<T> items = itemsExtractor.apply(nextPage);
return items != null && !items.isEmpty();
}

@Override
public P next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

P current = nextPage;

String nextCursor = nextCursorExtractor.apply(current);
if (nextCursor == null || nextCursor.isEmpty()) {
nextPage = null;
} else {
cursor = nextCursor;
nextPage = fetcher.apply(cursor);
}

return current;
}
}
160 changes: 155 additions & 5 deletions cwms-data-api/src/main/java/cwms/cda/api/TimeSeriesController.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import cwms.cda.data.dao.VerticalDatum;
import cwms.cda.data.dto.TimeSeries;
import cwms.cda.formatters.ContentType;
import cwms.cda.formatters.csv.CsvV1;
import cwms.cda.data.dto.csv.TimeSeriesCsv;
import cwms.cda.data.dto.csv.TimeSeriesCsvRow;
import cwms.cda.formatters.Formats;
import cwms.cda.formatters.csv.CwmsCsvProcessor;
import cwms.cda.helpers.DateUtils;
import io.javalin.apibuilder.CrudHandler;
import io.javalin.core.util.Header;
Expand All @@ -39,10 +43,17 @@
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.utils.URIBuilder;
Expand Down Expand Up @@ -382,12 +393,25 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) {
+ "identifies where in the request you are. This is an opaque"
+ " value, and can be obtained from the 'next-page' value in "
+ "the response. Deprecated, use " + PAGE + " instead."),
@OpenApiParam(name = PAGE_SIZE,
type = Integer.class,
@OpenApiParam(name = PAGE_SIZE, type = Integer.class,
description = "How many entries per page returned. "
+ "Default " + DEFAULT_PAGE_SIZE + ". Use 0 to return an empty values array, "
+ "For JSON/XML paging, this controls page size. "
+ "For CSV, this controls the internal fetch batch size used while streaming a single response. "
+ "CSV clients do not request subsequent pages. "
+ "Default " + DEFAULT_PAGE_SIZE +". Use 0 to return an empty values array, "
+ "or -1 to return the entire window in one response without a next-page cursor. "
+ "Values less than -1 are invalid.")
+ "Values less than -1 are invalid."),
@OpenApiParam(name = CSV_RESUME_FROM_PAGE,
description = "CSV only. Optional opaque cursor that tells the server where to begin the "
+ "streamed CSV response. This is intended for resuming a previously interrupted "
+ "CSV download. The response is still a single streamed CSV response, not a paged "
+ "CSV response."),
@OpenApiParam(name = INCLUDE_METADATA_AS_CSV_COMMENTS, type = Boolean.class,
description = "When true, include dataset metadata as csv header comments "
+ "prepended with # (default is false)."),
@OpenApiParam(name = INCLUDE_OPTIONAL_CSV_COLUMNS, type = Boolean.class,
description = "When true, include optional columns (quality-code, data-entry-date) "
+ "in the CSV response (default is false).")
},
responses = {
@OpenApiResponse(status = STATUS_200,
Expand All @@ -397,6 +421,7 @@ public void delete(@NotNull Context ctx, @NotNull String timeseries) {
@OpenApiContent(from = TimeSeries.class, type = Formats.XMLV2),
@OpenApiContent(from = TimeSeries.class, type = Formats.XML),
@OpenApiContent(from = TimeSeries.class, type = Formats.JSON),
@OpenApiContent(from = TimeSeriesCsv.class, type= Formats.CSV),
@OpenApiContent(from = TimeSeries.class, type = ""),}),
@OpenApiResponse(status = STATUS_400, description = "Invalid parameter combination"),
@OpenApiResponse(status = STATUS_404, description = "The provided combination of "
Expand Down Expand Up @@ -471,6 +496,18 @@ public void getAll(@NotNull Context ctx) {
.withShouldTrim(trim.getOrDefault(true))
.withIncludeEntryDate(includeEntryDate)
.build();

// CSV: stream a single response; page-size is only internal batch size
if (Formats.CSV.equals(contentType.getType())) {
String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class)
.getOrDefault("");
boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class)
.getOrDefault(false);
int csvBatchSize = validateCsvBatchSize(pageSize);
streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType);
return;
}

// Execute DAO call with a timeout so we can return a clearer message instead of a generic 500
int apiTimeoutMs = Integer.getInteger("cwms.cda.api.apiTimeoutMs", 45000);
CompletableFuture<TimeSeries> daoFuture = CompletableFuture.supplyAsync(
Expand All @@ -492,7 +529,7 @@ public void getAll(@NotNull Context ctx) {
throw unwrapExecutionException(ex);
}

if(datum != null) { //this will be null for non-elevation ts
if (datum != null) { //this will be null for non-elevation ts
// user has requested a specific vertical datum
VerticalDatum vd = VerticalDatum.valueOf(datum); // the users request
ts = TimeSeriesVerticalDatumConverter.convertToVerticalDatum(ts, vd);
Expand Down Expand Up @@ -521,10 +558,33 @@ public void getAll(@NotNull Context ctx) {
}

String office = ctx.queryParam(OFFICE);

// CSV: stream a single response; page-size is only internal batch size
if (Formats.CSV.equals(contentType.getType())) {
TimeSeriesRequestParameters requestParameters = new TimeSeriesRequestParameters.Builder()
.withNames(names)
.withOffice(office)
.withUnits(units)
.withBeginTime(beginZdt)
.withEndTime(endZdt)
.withShouldTrim(trim.getOrDefault(true))
.withIncludeEntryDate(includeEntryDate)
.build();

String csvResumeFromPage = ctx.queryParamAsClass(CSV_RESUME_FROM_PAGE, String.class)
.getOrDefault("");
boolean includeOptionalColumns = ctx.queryParamAsClass(INCLUDE_OPTIONAL_CSV_COLUMNS, Boolean.class)
.getOrDefault(false);
int csvBatchSize = validateCsvBatchSize(pageSize);
streamCsv(ctx, csvBatchSize, csvResumeFromPage, includeOptionalColumns, dao, requestParameters, contentType);
return;
}

results = dao.getTimeseries(format, names, office, units, datum, beginZdt, endZdt, tz);
ctx.status(HttpServletResponse.SC_OK);
ctx.result(results);
}

addDeprecatedContentTypeWarning(ctx, contentType);
requestResultSize.update(results.length());
} catch (NotFoundException e) {
Expand Down Expand Up @@ -552,6 +612,96 @@ static RuntimeException unwrapExecutionException(java.util.concurrent.ExecutionE
return new RuntimeException(cause);
}

private void streamCsv(@NotNull Context ctx, int batchSize, String resumeFromPage, boolean includeOptionalColumns,
TimeSeriesDao dao, TimeSeriesRequestParameters requestParameters, ContentType contentType) {
ctx.status(HttpServletResponse.SC_OK);
ctx.contentType(Formats.CSV);
ctx.header(Header.CONTENT_TYPE, Formats.CSV + "; charset=UTF-8");

boolean includeMetadataAsComments = ctx.queryParamAsClass(INCLUDE_METADATA_AS_CSV_COMMENTS, Boolean.class)
.getOrDefault(false);

ctx.header(Header.CACHE_CONTROL, "no-store");
ctx.header("X-Content-Type-Options", "nosniff");
ctx.header("X-Stream-Batch-Size", String.valueOf(batchSize));

CsvV1 csv = new CsvV1();

try {
ServletOutputStream out = ctx.res.getOutputStream();
Iterator<TimeSeries> iterator = new CursorIterator<>(
resumeFromPage,
cursor -> dao.getTimeseries(cursor, batchSize, requestParameters),
TimeSeries::getValues,
TimeSeries::getNextPage);
AtomicBoolean firstBatch = new AtomicBoolean(true);
iterator.forEachRemaining(ts -> streamBatches(ctx, includeOptionalColumns, ts, firstBatch, includeMetadataAsComments, csv, out));
} catch (IOException e) {
throw new DataAccessException("Failed streaming CSV response", e);
}

addDeprecatedContentTypeWarning(ctx, contentType);
}

private static void streamBatches(@NotNull Context ctx, boolean includeOptionalColumns, TimeSeries ts, AtomicBoolean firstBatch, boolean includeMetadataAsComments, CsvV1 csv, ServletOutputStream out) {
try {
List<TimeSeries.Record> values = ts.getValues();

List<TimeSeriesCsvRow> rows = new ArrayList<>(values.size());
String tsId = ts.getName();
String officeId = ts.getOfficeId();
String units = ts.getUnits();
String versionDateStr = ts.getVersionDate() == null
? null
: ts.getVersionDate().toInstant().toString();

for (TimeSeries.Record r : values) {
Instant dt = r.getDateTime() == null ? null : r.getDateTime().toInstant();
Instant entryDt = r.getDataEntryDate() == null ? null : r.getDataEntryDate().toInstant();

rows.add(new TimeSeriesCsvRow.Builder()
.withDateTime(dt)
.withValue(r.getValue())
.withQualityCode(r.getQualityCode())
.withDataEntryDate(entryDt)
.withUnits(units)
.build());
}

TimeSeriesCsv container = new TimeSeriesCsv.Builder()
.withTimeSeriesId(tsId)
.withOfficeId(officeId)
.withVersionDate(versionDateStr)
.withRows(rows)
.build();

String chunk;
if (firstBatch.get()) {
chunk = includeMetadataAsComments
? csv.formatWithMetadata(container, includeOptionalColumns)
: csv.format(container, includeOptionalColumns);
} else {
chunk = csv.format(container, includeOptionalColumns);
chunk = CwmsCsvProcessor.stripHeader(chunk);
}

out.write(chunk.getBytes(StandardCharsets.UTF_8));
out.flush();
ctx.res.flushBuffer();

firstBatch.set(false);
} catch (IOException e) {
throw new RuntimeException(e);
}
}

private int validateCsvBatchSize(int requestedPageSize) {
if (requestedPageSize <= 0) {
throw new IllegalArgumentException("For CSV streaming, page-size must be greater than 0.");
}
return requestedPageSize;
}

private void addLinkHeader(@NotNull Context ctx, TimeSeries ts, ContentType contentType) {
try {
// Send back the link to the next page in the response header
Expand Down
13 changes: 13 additions & 0 deletions cwms-data-api/src/main/java/cwms/cda/data/dao/StreamConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,20 @@
import java.io.InputStream;
import java.sql.SQLException;

/**
* A consumer for streaming binary data out to callers (e.g., HTTP layer).
*
* The primary abstract method accepts an InputStream with optional position and total length.
* For generated content where the total length is not known up front (e.g., CSV rendered on-demand),
* use the default two-argument overload which delegates with null position/length values, allowing
* callers to choose chunked transfer without setting Content-Length.
*/
@FunctionalInterface
public interface StreamConsumer {

void accept(InputStream stream, long inputStreamPosition, String mediaType, long totalLength) throws SQLException, IOException;

default void accept(InputStream stream, String mediaType) throws SQLException, IOException {
accept(stream, 0L, mediaType, -1L);
}
}
26 changes: 26 additions & 0 deletions cwms-data-api/src/main/java/cwms/cda/data/dao/TimeSeriesDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,31 @@ List<RecentValue> findRecentsInRange(String office, String categoryId, String gr

List<RecentValue> findMostRecentsInRange(String office, List<String> tsIds, Timestamp pastLimit,
Timestamp futureLimit, UnitSystem unitSystem);
/**
* Streams the requested TimeSeries data in CSV format using a {@link StreamConsumer},
* similar to the BlobDao streaming pattern. The DAO will invoke the provided consumer
* with an InputStream that produces the CSV bytes on-the-fly, avoiding buffering the
* entire dataset in memory.
*
* Notes:
* - The consumer is responsible for copying the InputStream to the HTTP response and
* setting any additional headers as needed.
* - The mediaType passed to the consumer will be {@code Formats.CSV}.
* - The totalLength may be unknown and can be negative; callers should not rely on it.
*
* @param requestParameters request parameters describing the time-series retrieval
* @param pageSize page size to use when paging results internally; non-positive values disable limiting
* @param consumer a sink that will receive a streaming InputStream of CSV data
*/
/**
* Stream time-series in CSV using CsvV1 formatter.
*
* @param requestParameters parameters describing the request
* @param consumer sink for the InputStream
* @param includeMetadataAsComments when true, include metadata as header comments
*/
void streamRequestedTimeSeriesCsv(TimeSeriesRequestParameters requestParameters,
StreamConsumer consumer,
boolean includeMetadataAsComments);

}
Loading
Loading