From f8a15e332bab45e5b5c7d23a1ec3bf5d97cbaa3f Mon Sep 17 00:00:00 2001 From: Ian Yap Date: Sat, 9 May 2026 10:35:54 -0700 Subject: [PATCH 1/2] Backport worker_may_ignore history handling Co-authored-by: Cursor --- lib/temporal/workflow/history/event.rb | 5 +- lib/temporal/workflow/state_manager.rb | 39 +++++++- .../temporal/workflow/history/event_spec.rb | 22 +++++ .../temporal/workflow/state_manager_spec.rb | 91 +++++++++++++++++++ 4 files changed, 154 insertions(+), 3 deletions(-) diff --git a/lib/temporal/workflow/history/event.rb b/lib/temporal/workflow/history/event.rb index 10ebcf80..e19d094c 100644 --- a/lib/temporal/workflow/history/event.rb +++ b/lib/temporal/workflow/history/event.rb @@ -30,13 +30,14 @@ class Event PREFIX = 'EVENT_TYPE_'.freeze - attr_reader :id, :timestamp, :type, :attributes + attr_reader :id, :timestamp, :type, :attributes, :worker_may_ignore def initialize(raw_event) @id = raw_event.event_id @timestamp = raw_event.event_time.to_time @type = raw_event.event_type.to_s.gsub(PREFIX, '') @attributes = extract_attributes(raw_event) + @worker_may_ignore = raw_event.worker_may_ignore freeze end @@ -74,6 +75,8 @@ def target_attributes def extract_attributes(raw_event) attributes_name = raw_event.attributes + return nil if attributes_name.nil? + raw_event.public_send(attributes_name) end end diff --git a/lib/temporal/workflow/state_manager.rb b/lib/temporal/workflow/state_manager.rb index 800320e3..585a8475 100644 --- a/lib/temporal/workflow/state_manager.rb +++ b/lib/temporal/workflow/state_manager.rb @@ -123,9 +123,36 @@ def validate_append_command(command) end end + IGNORED_EVENT_TYPES = %w[ + WORKFLOW_EXECUTION_OPTIONS_UPDATED + NEXUS_OPERATION_SCHEDULED + NEXUS_OPERATION_STARTED + NEXUS_OPERATION_COMPLETED + NEXUS_OPERATION_FAILED + NEXUS_OPERATION_CANCELED + NEXUS_OPERATION_TIMED_OUT + NEXUS_OPERATION_CANCEL_REQUESTED + NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED + NEXUS_OPERATION_CANCEL_REQUEST_FAILED + ].freeze + def apply_event(event) + if IGNORED_EVENT_TYPES.include?(event.type) + log_ignored_event('Unsupported event ignored', event) + return + end + state_machine = command_tracker[event.originating_event_id] - history_target = History::EventTarget.from_event(event) + history_target = begin + History::EventTarget.from_event(event) + rescue History::EventTarget::UnexpectedEventType + if event.worker_may_ignore + log_ignored_event('Unknown event type ignored', event) + return + end + + raise UnsupportedEvent, event.type + end case event.type when 'WORKFLOW_EXECUTION_STARTED' @@ -305,10 +332,18 @@ def apply_event(event) discard_command(history_target) else - raise UnsupportedEvent, event.type + if event.worker_may_ignore + log_ignored_event('Unknown event ignored', event) + else + raise UnsupportedEvent, event.type + end end end + def log_ignored_event(message, event) + Temporal.logger.warn(message, event_type: event.type, event_id: event.id) + end + def dispatch(history_target, name, *attributes) dispatcher.dispatch(history_target, name, attributes) end diff --git a/spec/unit/lib/temporal/workflow/history/event_spec.rb b/spec/unit/lib/temporal/workflow/history/event_spec.rb index 37fb9af1..872af997 100644 --- a/spec/unit/lib/temporal/workflow/history/event_spec.rb +++ b/spec/unit/lib/temporal/workflow/history/event_spec.rb @@ -1,4 +1,5 @@ require 'temporal/workflow/history/event' +require 'temporal/api/history/v1/message_pb' describe Temporal::Workflow::History::Event do subject { described_class.new(raw_event) } @@ -24,6 +25,27 @@ it 'sets correct attributes' do expect(subject.attributes).to eq(raw_event.workflow_execution_started_event_attributes) end + + context 'when the event attributes are unknown to the generated proto' do + let(:raw_event) do + event = Temporalio::Api::History::V1::HistoryEvent.decode(unknown_attributes_event_payload) + event.event_time = Google::Protobuf::Timestamp.new(seconds: Time.now.to_i) + event + end + let(:unknown_attributes_event_payload) do + [ + 0x08, 99, # event_id + 0x18, 99, # event_type + 0xA2, 0x03, 0x00, # unknown oneof attributes field + 0xE0, 0x12, 0x01 # worker_may_ignore + ].pack('C*') + end + + it 'keeps the event constructable so worker_may_ignore can be read' do + expect(subject.attributes).to be_nil + expect(subject.worker_may_ignore).to eq(true) + end + end end describe '#originating_event_id' do diff --git a/spec/unit/lib/temporal/workflow/state_manager_spec.rb b/spec/unit/lib/temporal/workflow/state_manager_spec.rb index 00629c9c..2b1bb1a5 100644 --- a/spec/unit/lib/temporal/workflow/state_manager_spec.rb +++ b/spec/unit/lib/temporal/workflow/state_manager_spec.rb @@ -126,6 +126,97 @@ class MyWorkflow < Temporal::Workflow; end end end + describe '#apply compatibility with ignorable events' do + let(:state_manager) { described_class.new(dispatcher) } + let(:dispatcher) { instance_double(Temporal::Workflow::Dispatcher) } + let(:window) do + instance_double( + Temporal::Workflow::History::Window, + replay?: true, + local_time: nil, + last_event_id: 0, + markers: [], + events: [event] + ) + end + + before do + allow(Temporal.logger).to receive(:warn) + end + + context 'when the event type is known to be safe to ignore' do + let(:event) { double('history event', type: 'NEXUS_OPERATION_STARTED', id: 10) } + + it 'logs and ignores the event' do + expect { state_manager.apply(window) }.not_to raise_error + expect(Temporal.logger).to have_received(:warn).with( + 'Unsupported event ignored', + event_type: 'NEXUS_OPERATION_STARTED', + event_id: 10 + ) + end + end + + context 'when an unknown event may be ignored by the worker' do + let(:event) do + double( + 'history event', + type: 'SOME_NEW_EVENT', + id: 11, + originating_event_id: 11, + worker_may_ignore: true + ) + end + + it 'logs and ignores the event' do + expect { state_manager.apply(window) }.not_to raise_error + expect(Temporal.logger).to have_received(:warn).with( + 'Unknown event type ignored', + event_type: 'SOME_NEW_EVENT', + event_id: 11 + ) + end + end + + context 'when an unknown event may not be ignored by the worker' do + let(:event) do + double( + 'history event', + type: 'SOME_NEW_EVENT', + id: 12, + originating_event_id: 12, + worker_may_ignore: false + ) + end + + it 'raises an unsupported event error' do + expect { state_manager.apply(window) }.to raise_error(described_class::UnsupportedEvent, 'SOME_NEW_EVENT') + end + end + + context 'when a known event target has an unhandled event type but may be ignored' do + let(:event) do + double( + 'history event', + type: 'WORKFLOW_EXECUTION_UPDATE_ACCEPTED', + id: 13, + originating_event_id: 13, + target_attributes: {}, + worker_may_ignore: true + ) + end + + it 'logs and ignores the event' do + expect { state_manager.apply(window) }.not_to raise_error + expect(Temporal.logger).to have_received(:warn).with( + 'Unknown event ignored', + event_type: 'WORKFLOW_EXECUTION_UPDATE_ACCEPTED', + event_id: 13 + ) + end + end + end + describe '#search_attributes' do let(:initial_search_attributes) do { From 8123e126d1eb130e971cfe0f48d1437873c751a3 Mon Sep 17 00:00:00 2001 From: Ian Yap Date: Sat, 9 May 2026 13:16:19 -0700 Subject: [PATCH 2/2] Prepare 0.0.7 backport release Co-authored-by: Cursor --- CHANGELOG.md | 7 +++++++ lib/temporal/version.rb | 2 +- spec/unit/lib/temporal/workflow/history/event_spec.rb | 11 +++++++---- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index cd984984..9a53e09b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## 0.0.7 + +- Backport worker_may_ignore history event handling from the 0.1.x line +- Skip known Nexus/options-updated events and unknown worker-ignorable events +- Preserve fail-closed behavior for non-ignorable unknown events +- Make History::Event constructable when event attributes are unknown to the generated proto + ## 0.0.6 - Defer gRPC loading via autoload for fork safety diff --git a/lib/temporal/version.rb b/lib/temporal/version.rb index 50451bd9..5bdc1ec7 100644 --- a/lib/temporal/version.rb +++ b/lib/temporal/version.rb @@ -1,3 +1,3 @@ module Temporal - VERSION = '0.0.6'.freeze + VERSION = '0.0.7'.freeze end diff --git a/spec/unit/lib/temporal/workflow/history/event_spec.rb b/spec/unit/lib/temporal/workflow/history/event_spec.rb index 872af997..66720c9e 100644 --- a/spec/unit/lib/temporal/workflow/history/event_spec.rb +++ b/spec/unit/lib/temporal/workflow/history/event_spec.rb @@ -32,12 +32,15 @@ event.event_time = Google::Protobuf::Timestamp.new(seconds: Time.now.to_i) event end + # Field 5000, wire type 2 (length-delimited), length 0: + # tag = (5000 << 3) | 2 = 40002 + # varint(40002) = [0xC2, 0xB8, 0x02] let(:unknown_attributes_event_payload) do [ - 0x08, 99, # event_id - 0x18, 99, # event_type - 0xA2, 0x03, 0x00, # unknown oneof attributes field - 0xE0, 0x12, 0x01 # worker_may_ignore + 0x08, 99, # event_id = 99 + 0x18, 99, # event_type = 99 (unknown enum value) + 0xC2, 0xB8, 0x02, 0x00, # unknown oneof attributes at field 5000, length 0 + 0xE0, 0x12, 0x01 # worker_may_ignore = true (field 300) ].pack('C*') end