Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## 0.0.7

- Backport worker_may_ignore history event handling from the 0.1.x line
- Skip known Nexus/options-updated events and unknown worker-ignorable events
- Preserve fail-closed behavior for non-ignorable unknown events
- Make History::Event constructable when event attributes are unknown to the generated proto

## 0.0.6

- Defer gRPC loading via autoload for fork safety
Expand Down
2 changes: 1 addition & 1 deletion lib/temporal/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Temporal
VERSION = '0.0.6'.freeze
VERSION = '0.0.7'.freeze
end
5 changes: 4 additions & 1 deletion lib/temporal/workflow/history/event.rb
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,14 @@ class Event

PREFIX = 'EVENT_TYPE_'.freeze

attr_reader :id, :timestamp, :type, :attributes
attr_reader :id, :timestamp, :type, :attributes, :worker_may_ignore

def initialize(raw_event)
@id = raw_event.event_id
@timestamp = raw_event.event_time.to_time
@type = raw_event.event_type.to_s.gsub(PREFIX, '')
@attributes = extract_attributes(raw_event)
@worker_may_ignore = raw_event.worker_may_ignore

freeze
end
Expand Down Expand Up @@ -74,6 +75,8 @@ def target_attributes

def extract_attributes(raw_event)
attributes_name = raw_event.attributes
return nil if attributes_name.nil?

raw_event.public_send(attributes_name)
end
end
Expand Down
39 changes: 37 additions & 2 deletions lib/temporal/workflow/state_manager.rb
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,36 @@ def validate_append_command(command)
end
end

IGNORED_EVENT_TYPES = %w[
WORKFLOW_EXECUTION_OPTIONS_UPDATED
NEXUS_OPERATION_SCHEDULED
NEXUS_OPERATION_STARTED
NEXUS_OPERATION_COMPLETED
NEXUS_OPERATION_FAILED
NEXUS_OPERATION_CANCELED
NEXUS_OPERATION_TIMED_OUT
NEXUS_OPERATION_CANCEL_REQUESTED
NEXUS_OPERATION_CANCEL_REQUEST_COMPLETED
NEXUS_OPERATION_CANCEL_REQUEST_FAILED
].freeze

def apply_event(event)
if IGNORED_EVENT_TYPES.include?(event.type)
log_ignored_event('Unsupported event ignored', event)
return
end

state_machine = command_tracker[event.originating_event_id]
history_target = History::EventTarget.from_event(event)
history_target = begin
History::EventTarget.from_event(event)
rescue History::EventTarget::UnexpectedEventType
if event.worker_may_ignore
log_ignored_event('Unknown event type ignored', event)
return
end

raise UnsupportedEvent, event.type
end

case event.type
when 'WORKFLOW_EXECUTION_STARTED'
Expand Down Expand Up @@ -305,10 +332,18 @@ def apply_event(event)
discard_command(history_target)

else
raise UnsupportedEvent, event.type
if event.worker_may_ignore
log_ignored_event('Unknown event ignored', event)
else
raise UnsupportedEvent, event.type
end
end
end

def log_ignored_event(message, event)
Temporal.logger.warn(message, event_type: event.type, event_id: event.id)
end

def dispatch(history_target, name, *attributes)
dispatcher.dispatch(history_target, name, attributes)
end
Expand Down
25 changes: 25 additions & 0 deletions spec/unit/lib/temporal/workflow/history/event_spec.rb
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
require 'temporal/workflow/history/event'
require 'temporal/api/history/v1/message_pb'

describe Temporal::Workflow::History::Event do
subject { described_class.new(raw_event) }
Expand All @@ -24,6 +25,30 @@
it 'sets correct attributes' do
expect(subject.attributes).to eq(raw_event.workflow_execution_started_event_attributes)
end

context 'when the event attributes are unknown to the generated proto' do
let(:raw_event) do
event = Temporalio::Api::History::V1::HistoryEvent.decode(unknown_attributes_event_payload)
event.event_time = Google::Protobuf::Timestamp.new(seconds: Time.now.to_i)
event
end
# Field 5000, wire type 2 (length-delimited), length 0:
# tag = (5000 << 3) | 2 = 40002
# varint(40002) = [0xC2, 0xB8, 0x02]
let(:unknown_attributes_event_payload) do
[
0x08, 99, # event_id = 99
0x18, 99, # event_type = 99 (unknown enum value)
0xC2, 0xB8, 0x02, 0x00, # unknown oneof attributes at field 5000, length 0
0xE0, 0x12, 0x01 # worker_may_ignore = true (field 300)
].pack('C*')
end

it 'keeps the event constructable so worker_may_ignore can be read' do
expect(subject.attributes).to be_nil
expect(subject.worker_may_ignore).to eq(true)
end
end
end

describe '#originating_event_id' do
Expand Down
91 changes: 91 additions & 0 deletions spec/unit/lib/temporal/workflow/state_manager_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,97 @@ class MyWorkflow < Temporal::Workflow; end
end
end

describe '#apply compatibility with ignorable events' do
let(:state_manager) { described_class.new(dispatcher) }
let(:dispatcher) { instance_double(Temporal::Workflow::Dispatcher) }
let(:window) do
instance_double(
Temporal::Workflow::History::Window,
replay?: true,
local_time: nil,
last_event_id: 0,
markers: [],
events: [event]
)
end

before do
allow(Temporal.logger).to receive(:warn)
end

context 'when the event type is known to be safe to ignore' do
let(:event) { double('history event', type: 'NEXUS_OPERATION_STARTED', id: 10) }

it 'logs and ignores the event' do
expect { state_manager.apply(window) }.not_to raise_error
expect(Temporal.logger).to have_received(:warn).with(
'Unsupported event ignored',
event_type: 'NEXUS_OPERATION_STARTED',
event_id: 10
)
end
end

context 'when an unknown event may be ignored by the worker' do
let(:event) do
double(
'history event',
type: 'SOME_NEW_EVENT',
id: 11,
originating_event_id: 11,
worker_may_ignore: true
)
end

it 'logs and ignores the event' do
expect { state_manager.apply(window) }.not_to raise_error
expect(Temporal.logger).to have_received(:warn).with(
'Unknown event type ignored',
event_type: 'SOME_NEW_EVENT',
event_id: 11
)
end
end

context 'when an unknown event may not be ignored by the worker' do
let(:event) do
double(
'history event',
type: 'SOME_NEW_EVENT',
id: 12,
originating_event_id: 12,
worker_may_ignore: false
)
end

it 'raises an unsupported event error' do
expect { state_manager.apply(window) }.to raise_error(described_class::UnsupportedEvent, 'SOME_NEW_EVENT')
end
end

context 'when a known event target has an unhandled event type but may be ignored' do
let(:event) do
double(
'history event',
type: 'WORKFLOW_EXECUTION_UPDATE_ACCEPTED',
id: 13,
originating_event_id: 13,
target_attributes: {},
worker_may_ignore: true
)
end

it 'logs and ignores the event' do
expect { state_manager.apply(window) }.not_to raise_error
expect(Temporal.logger).to have_received(:warn).with(
'Unknown event ignored',
event_type: 'WORKFLOW_EXECUTION_UPDATE_ACCEPTED',
event_id: 13
)
end
end
end

describe '#search_attributes' do
let(:initial_search_attributes) do
{
Expand Down