From fc17407b11ce967c2241951cb0eac97252e14f77 Mon Sep 17 00:00:00 2001 From: Ricardo Zanini Date: Fri, 8 May 2026 14:53:41 -0400 Subject: [PATCH] fix: Add until predicates to any listeners Signed-off-by: Ricardo Zanini --- .../fluent/func/FuncListenToBuilder.java | 12 + .../fluent/func/dsl/BaseFuncListenSpec.java | 17 ++ .../func/ListenUntilCurrentTest.java | 219 ++++++++++++++++++ .../func/ListenUntilValidationTest.java | 81 +++++++ .../fluent/spec/dsl/BaseListenSpec.java | 16 ++ 5 files changed, 345 insertions(+) create mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilCurrentTest.java create mode 100644 experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilValidationTest.java diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java index 8162d078e..54dc2394e 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/FuncListenToBuilder.java @@ -20,6 +20,8 @@ import io.serverlessworkflow.api.types.ListenTo; import io.serverlessworkflow.api.types.OneEventConsumptionStrategy; import io.serverlessworkflow.api.types.Until; +import io.serverlessworkflow.api.types.func.ContextPredicate; +import io.serverlessworkflow.api.types.func.FilterPredicate; import io.serverlessworkflow.api.types.func.UntilPredicate; import io.serverlessworkflow.fluent.spec.AbstractEventConsumptionStrategyBuilder; import java.util.function.Predicate; @@ -66,4 +68,14 @@ public FuncListenToBuilder until(Predicate predicate, Class predClass) this.setUntil(new UntilPredicate().withPredicate(predicate, predClass)); return this; } + + public FuncListenToBuilder until(ContextPredicate predicate, Class predClass) { + this.setUntil(new UntilPredicate().withPredicate(predicate, predClass)); + return this; + } + + public FuncListenToBuilder until(FilterPredicate predicate, Class predClass) { + this.setUntil(new UntilPredicate().withPredicate(predicate, predClass)); + return this; + } } diff --git a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java index 5715ddfe0..9a1814674 100644 --- a/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java +++ b/experimental/fluent/func/src/main/java/io/serverlessworkflow/fluent/func/dsl/BaseFuncListenSpec.java @@ -15,6 +15,8 @@ */ package io.serverlessworkflow.fluent.func.dsl; +import io.serverlessworkflow.api.types.func.ContextPredicate; +import io.serverlessworkflow.api.types.func.FilterPredicate; import io.serverlessworkflow.fluent.func.FuncEventFilterBuilder; import io.serverlessworkflow.fluent.func.FuncListenToBuilder; import io.serverlessworkflow.fluent.spec.dsl.BaseListenSpec; @@ -43,6 +45,21 @@ private static Consumer[] castFilters(Consumer[] arr) public SELF until(Predicate predicate, Class predClass) { Objects.requireNonNull(predicate, "predicate"); + Objects.requireNonNull(predClass, "predClass"); + this.setUntilStep(u -> u.until(predicate, predClass)); + return self(); + } + + public SELF until(ContextPredicate predicate, Class predClass) { + Objects.requireNonNull(predicate, "predicate"); + Objects.requireNonNull(predClass, "predClass"); + this.setUntilStep(u -> u.until(predicate, predClass)); + return self(); + } + + public SELF until(FilterPredicate predicate, Class predClass) { + Objects.requireNonNull(predicate, "predicate"); + Objects.requireNonNull(predClass, "predClass"); this.setUntilStep(u -> u.until(predicate, predClass)); return self(); } diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilCurrentTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilCurrentTest.java new file mode 100644 index 000000000..32cc57afe --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilCurrentTest.java @@ -0,0 +1,219 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.listen; +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.toAny; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.cloudevents.CloudEvent; +import io.cloudevents.core.builder.CloudEventBuilder; +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import io.serverlessworkflow.impl.WorkflowDefinition; +import io.serverlessworkflow.impl.WorkflowInstance; +import io.serverlessworkflow.impl.WorkflowModel; +import io.serverlessworkflow.impl.WorkflowModelCollection; +import io.serverlessworkflow.impl.WorkflowStatus; +import io.serverlessworkflow.impl.events.EventPublisher; +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ListenUntilCurrentTest { + + private static final Logger log = LoggerFactory.getLogger(ListenUntilCurrentTest.class); + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private CloudEvent createOrderEvent(String instanceId, int orderNum) { + Order order = new Order("order-" + orderNum, "PENDING", 100.0 * orderNum); + try { + return CloudEventBuilder.v1() + .withId("event-" + orderNum) + .withSource(URI.create("test:/orders")) + .withType("order.created") + .withExtension("instanceid", instanceId) + .withData("application/json", MAPPER.writeValueAsBytes(order)) + .build(); + } catch (Exception e) { + throw new RuntimeException("Failed to create order event", e); + } + } + + @Test + public void testCurrentToAnyWithUntilExpression() throws Exception { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + FuncWorkflowBuilder.workflow("test-toany-until") + .tasks( + listen( + "waitOrders", + toAny("order.created") + .until( + (WorkflowModelCollection events) -> { + log.info("Predicate called!"); + log.info(" Param type: {}", events.getClass().getName()); + log.info(" Param value: {}", events); + log.info(" Event count: {}", (long) events.size()); + boolean result = (long) events.size() >= 3; + log.info(" Returning: {}", result); + return result; + }, + WorkflowModelCollection.class))) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new Object()); + CompletableFuture future = instance.start(); + + // Wait for WAITING status + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + + // Emit 3 order events + log.info("Publishing event 1..."); + publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join(); + + log.info("Publishing event 2..."); + publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join(); + + log.info("Publishing event 3..."); + publisher.publish(createOrderEvent(instance.id(), 3)).toCompletableFuture().join(); + + // Workflow should complete after 3 events + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.COMPLETED); + + WorkflowModel result = future.join(); + long count = ((WorkflowModelCollection) result).size(); + log.info("Workflow completed with {} items", count); + assertEquals(3, count); + } + } + + @Test + public void testToAnyWithUntilContextPredicate() { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + FuncWorkflowBuilder.workflow("test-toany-until-context") + .tasks( + listen( + "waitOrders", + toAny("order.created") + .until( + (events, context) -> { + log.info("ContextPredicate called!"); + log.info(" Events count: {}", (long) events.size()); + assertNotNull(context); + log.info(" Context instance id: {}", context.instanceData().id()); + // Stop after 2 events + return (long) events.size() >= 2; + }, + WorkflowModelCollection.class))) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new Object()); + CompletableFuture future = instance.start(); + + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + + log.info("Publishing event 1..."); + publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join(); + + log.info("Publishing event 2..."); + publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join(); + + // Should complete after 2 events + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.COMPLETED); + + WorkflowModel result = future.join(); + long count = ((WorkflowModelCollection) result).size(); + log.info("Workflow completed with {} items", count); + assertEquals(2, count); + } + } + + @Test + public void testToAnyWithUntilFilterPredicate() { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + FuncWorkflowBuilder.workflow("test-toany-until-filter") + .tasks( + listen( + "waitOrders", + toAny("order.created") + .until( + (events, workflowCtx, taskCtx) -> { + log.info("FilterPredicate called!"); + log.info(" Events count: {}", (long) events.size()); + assertNotNull(workflowCtx); + assertNotNull(taskCtx); + log.info(" Task position: {}", taskCtx.position()); + return (long) events.size() >= 3; + }, + WorkflowModelCollection.class))) + .build(); + + WorkflowDefinition definition = app.workflowDefinition(workflow); + WorkflowInstance instance = definition.instance(new Object()); + CompletableFuture future = instance.start(); + + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.WAITING); + + EventPublisher publisher = app.eventPublishers().iterator().next(); + + log.info("Publishing event 1..."); + publisher.publish(createOrderEvent(instance.id(), 1)).toCompletableFuture().join(); + + log.info("Publishing event 2..."); + publisher.publish(createOrderEvent(instance.id(), 2)).toCompletableFuture().join(); + + log.info("Publishing event 3..."); + publisher.publish(createOrderEvent(instance.id(), 3)).toCompletableFuture().join(); + + await() + .atMost(Duration.ofSeconds(5)) + .until(() -> instance.status() == WorkflowStatus.COMPLETED); + + WorkflowModel result = future.join(); + long count = ((WorkflowModelCollection) result).size(); + log.info("Workflow completed with {} items", count); + assertEquals(3, count); + } + } + + public record Order(String id, String status, double amount) {} +} diff --git a/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilValidationTest.java b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilValidationTest.java new file mode 100644 index 000000000..b5c694e68 --- /dev/null +++ b/experimental/lambda/src/test/java/io/serverless/workflow/impl/executors/func/ListenUntilValidationTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2020-Present The Serverless Workflow Specification Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.serverless.workflow.impl.executors.func; + +import static io.serverlessworkflow.fluent.func.dsl.FuncDSL.*; +import static org.junit.jupiter.api.Assertions.*; + +import io.serverlessworkflow.api.types.Workflow; +import io.serverlessworkflow.fluent.func.FuncWorkflowBuilder; +import io.serverlessworkflow.impl.WorkflowApplication; +import org.junit.jupiter.api.Test; + +public class ListenUntilValidationTest { + + @Test + public void testUntilWithAllThrowsException() { + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + FuncWorkflowBuilder.workflow("test-all-until-invalid") + .tasks( + listen( + "waitOrders", + toAll("order.created") + .until( + (io.serverlessworkflow.impl.WorkflowModelCollection + events) -> events.stream().count() >= 3, + io.serverlessworkflow.impl.WorkflowModelCollection.class))) + .build(); + + app.workflowDefinition(workflow); + } + }); + + assertTrue(exception.getMessage().contains("until() is only supported with any()")); + assertTrue(exception.getMessage().contains("ALL")); + } + + @Test + public void testUntilWithOneThrowsException() { + IllegalStateException exception = + assertThrows( + IllegalStateException.class, + () -> { + try (WorkflowApplication app = WorkflowApplication.builder().build()) { + Workflow workflow = + FuncWorkflowBuilder.workflow("test-one-until-invalid") + .tasks( + listen( + "waitOrders", + toOne("order.created") + .until( + (io.serverlessworkflow.impl.WorkflowModelCollection + events) -> events.stream().count() >= 3, + io.serverlessworkflow.impl.WorkflowModelCollection.class))) + .build(); + + app.workflowDefinition(workflow); + } + }); + + assertTrue(exception.getMessage().contains("until() is only supported with any()")); + assertTrue(exception.getMessage().contains("ONE")); + } +} diff --git a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java index d32efe396..e6101a1ab 100644 --- a/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java +++ b/fluent/spec/src/main/java/io/serverlessworkflow/fluent/spec/dsl/BaseListenSpec.java @@ -28,6 +28,12 @@ */ public abstract class BaseListenSpec { + private enum StrategyType { + ALL, + ANY, + ONE + } + @FunctionalInterface public interface ToInvoker { void to(LISTEN_TASK listenTaskBuilder, Consumer toStep); @@ -50,6 +56,7 @@ public interface OneFilterApplier { private Consumer strategyStep; private Consumer untilStep; + private StrategyType strategyType; protected BaseListenSpec( ToInvoker toInvoker, @@ -73,6 +80,7 @@ protected final void setUntilStep(Consumer untilStep) { public final SELF all(Consumer... filters) { Objects.requireNonNull(filters, "filters"); strategyStep = t -> allApplier.apply(t, filters); + strategyType = StrategyType.ALL; return self(); } @@ -80,17 +88,25 @@ public final SELF all(Consumer... filters) { public final SELF any(Consumer... filters) { Objects.requireNonNull(filters, "filters"); strategyStep = t -> anyApplier.apply(t, filters); + strategyType = StrategyType.ANY; return self(); } public final SELF one(Consumer filter) { Objects.requireNonNull(filter, "filter"); strategyStep = t -> oneApplier.apply(t, filter); + strategyType = StrategyType.ONE; return self(); } protected final void acceptInto(LISTEN_TASK listenTaskBuilder) { Objects.requireNonNull(strategyStep, "listening strategy must be set (all/any/one)"); + if (untilStep != null && strategyType != StrategyType.ANY) { + throw new IllegalStateException( + "until() is only supported with any() event consumption strategy. " + + "Current strategy: " + + strategyType); + } toInvoker.to( listenTaskBuilder, t -> {