Skip to content

fix(orchestrations): provision broker topic for events#447

Open
flemzord wants to merge 1 commit into
mainfrom
fix/orchestration-broker-topic
Open

fix(orchestrations): provision broker topic for events#447
flemzord wants to merge 1 commit into
mainfrom
fix/orchestration-broker-topic

Conversation

@flemzord
Copy link
Copy Markdown
Member

@flemzord flemzord commented May 5, 2026

Summary

  • create a BrokerTopic for orchestration's own published events
  • wait for that topic to be ready before deploying orchestration
  • reconcile Broker resources when BrokerTopic resources change, so newly-created topics get their NATS streams

Context

A production stack using OneStreamByService had publisher mapping *:xjqfdgkwngtt-rzbk-orchestration with NATS auto-provision disabled, but no matching BrokerTopic/JetStream stream was created. Workflow startup events then failed with nats: no response from stream.

Tests

  • go test ./internal/resources/brokers ./internal/resources/brokertopics ./internal/resources/orchestrations ./internal/resources/brokerconsumers
  • ASSETS=$(go run sigs.k8s.io/controller-runtime/tools/setup-envtest@latest use 1.32.0 -p path) && CGO_ENABLED=0 KUBEBUILDER_ASSETS="$ASSETS" go test ./internal/tests -ginkgo.focus 'OrchestrationController|BrokerTopicController|BrokerConsumer'

@flemzord flemzord requested a review from a team as a code owner May 5, 2026 16:21
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 5, 2026

Warning

Rate limit exceeded

@flemzord has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 53 minutes and 4 seconds before requesting another review.

You’ve run out of usage credits. Purchase more in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: bfdb869e-3892-4a98-a1eb-31524ad67cf9

📥 Commits

Reviewing files that changed from the base of the PR and between f307811 and 73d4430.

📒 Files selected for processing (4)
  • internal/resources/brokers/init.go
  • internal/resources/brokertopics/brokertopics.go
  • internal/resources/orchestrations/init.go
  • internal/tests/orchestration_controller_test.go

Walkthrough

This PR enhances broker topic infrastructure by adding watch wiring that routes BrokerTopic events to Broker reconciliation, updating the BrokerTopic creation function to use namespaced names and explicit owner references, integrating broker topic creation into the orchestration controller with readiness gates, and adding corresponding test coverage.

Changes

Broker Topic Watch and Orchestration Integration

Layer / File(s) Summary
Imports & Types
internal/resources/brokers/init.go, internal/resources/brokertopics/brokertopics.go
Added k8s.io/apimachinery/pkg/types and sigs.k8s.io/controller-runtime/pkg/reconcile imports to support namespaced name construction and reconciliation request generation.
Watch Wiring
internal/resources/brokers/init.go
New core.WithWatch mapping observes BrokerTopic events and generates reconciliation requests for Broker resources, using topic.Spec.Stack as the target broker name.
BrokerTopic Creation
internal/resources/brokertopics/brokertopics.go
Create function now accepts types.NamespacedName parameters and explicitly sets two owner references (to owner and stack) via controllerutil.SetOwnerReference.
Orchestration Integration
internal/resources/orchestrations/init.go
Reconcile creates an "orchestration" broker topic and adds a readiness gate that blocks reconciliation with NewPendingError() when topic.Status.Ready is false.
Test Coverage
internal/tests/orchestration_controller_test.go
New assertion verifies that a BrokerTopic resource is created for orchestration events, loaded via core.GetObjectName(stack.Name, "orchestration"), and owned by the orchestration object.

Sequence Diagram

sequenceDiagram
    participant BrokerTopicEvent as BrokerTopic Event
    participant BrokerWatch as Broker Watch Handler
    participant BrokerReconciler as Broker Reconciler
    participant OrchestrReconciler as Orchestration Reconciler
    participant BrokerTopicAPI as BrokerTopic API

    BrokerTopicEvent->>BrokerWatch: BrokerTopic created/updated
    activate BrokerWatch
    BrokerWatch->>BrokerReconciler: Generate reconcile.Request<br/>(Broker named in topic.Spec.Stack)
    deactivate BrokerWatch
    
    OrchestrReconciler->>BrokerTopicAPI: Create BrokerTopic<br/>(service="orchestration")
    activate BrokerTopicAPI
    BrokerTopicAPI->>BrokerTopicAPI: Set owner refs (owner, stack)
    BrokerTopicAPI-->>OrchestrReconciler: Return topic
    deactivate BrokerTopicAPI
    
    OrchestrReconciler->>OrchestrReconciler: Check topic.Status.Ready
    alt Ready
        OrchestrReconciler->>OrchestrReconciler: Continue reconciliation
    else Not Ready
        OrchestrReconciler-->>OrchestrReconciler: Return PendingError
    end
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~22 minutes

Poem

🐰 A watch now binds the broker's fate,
When topics change, the request waits,
Owner refs keep lineage clear,
Readiness checks hold fast the gear,
Orchestrations now coordinate!

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Title check ✅ Passed The title 'fix(orchestrations): provision broker topic for events' directly and specifically describes the main change—creating a broker topic for orchestration events.
Description check ✅ Passed The description clearly relates to the changeset by explaining why a BrokerTopic is created for orchestrations, how it's managed, and the production issue it resolves.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/orchestration-broker-topic

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

fguery
fguery previously approved these changes May 5, 2026
core.WithFinalizer[*v1beta1.Broker]("clear", deleteBroker),
core.WithOwn[*v1beta1.Broker](&v1.Job{}),
core.WithWatchSettings[*v1beta1.Broker](),
core.WithWatch[*v1beta1.Broker, *v1beta1.BrokerTopic](func(ctx core.Context, topic *v1beta1.BrokerTopic) []reconcile.Request {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This watch wakes up the Broker reconciler when a BrokerTopic is created or updated.

In one-stream-by-service mode, the Broker reconciler is responsible for listing BrokerTopic objects and creating the matching NATS stream. When a new topic is introduced after the Broker already exists, there may be no Broker event otherwise, so the stream creation can be delayed until some unrelated reconciliation happens.

I also added a short code comment locally to make this dependency explicit.

@flemzord flemzord force-pushed the fix/orchestration-broker-topic branch from f307811 to 73d4430 Compare May 11, 2026 16:08
@flemzord flemzord enabled auto-merge (squash) May 11, 2026 16:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants