Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion cluster/cluster/forking/cluster_invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,15 @@ func (invoker *forkingClusterInvoker) Invoke(ctx context.Context, invocation pro
}
}

// forkCtx is canceled when Invoke returns, signaling losing parallel
// goroutines to stop rather than running to completion.
forkCtx, cancel := context.WithCancel(ctx)
defer cancel()

resultQ := queue.New(1)
for _, ivk := range selected {
go func(k protocolbase.Invoker) {
result := k.Invoke(ctx, invocation)
result := k.Invoke(forkCtx, invocation)
if err := resultQ.Put(result); err != nil {
logger.Errorf("[Cluster][Forking] resultQ put failed with exception err=%v", err)
}
Expand Down
35 changes: 35 additions & 0 deletions cluster/cluster/forking/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,38 @@ func TestForkingInvokeHalfTimeout(t *testing.T) {
assert.Equal(t, mockResult, result)
wg.Wait()
}

func TestForkingInvokeCancelsLosingBranches(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

forkingUrl.AddParam(constant.ForksKey, strconv.Itoa(2))

loserCtxCanceled := make(chan struct{})

winner := mock.NewMockInvoker(ctrl)
winner.EXPECT().IsAvailable().Return(true).AnyTimes()
winner.EXPECT().Invoke(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, _ base.Invocation) result.Result {
return &result.RPCResult{}
})

loser := mock.NewMockInvoker(ctrl)
loser.EXPECT().IsAvailable().Return(true).AnyTimes()
loser.EXPECT().Invoke(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, _ base.Invocation) result.Result {
<-ctx.Done()
close(loserCtxCanceled)
return &result.RPCResult{}
})

clusterInvoker := registerForking(winner, loser)
res := clusterInvoker.Invoke(context.Background(), &invocation.RPCInvocation{})
require.NoError(t, res.Error())

select {
case <-loserCtxCanceled:
case <-time.After(2 * time.Second):
t.Fatal("loser branch context was not canceled after winner succeeded")
}
}
7 changes: 7 additions & 0 deletions cluster/loadbalance/roundrobin/loadbalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func (lb *rrLoadBalance) Select(invokers []base.Invoker, invocation base.Invocat
cache, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
cachedInvokers := cache.(*cachedInvokers)

// Serialize the full select+update sequence per service+method key so that
// concurrent callers cannot observe each other's intermediate currentWeight
// state and skew the weighted distribution.
cachedInvokers.mu.Lock()
defer cachedInvokers.mu.Unlock()

var (
clean = false
totalWeight = int64(0)
Expand Down Expand Up @@ -166,5 +172,6 @@ func (robin *weightedRoundRobin) Current(delta int64) {
}

type cachedInvokers struct {
mu sync.Mutex
sync.Map /*[string]weightedRoundRobin*/
}
25 changes: 25 additions & 0 deletions cluster/loadbalance/roundrobin/loadbalance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package roundrobin
import (
"fmt"
"strconv"
"sync"
"testing"
)

Expand Down Expand Up @@ -75,3 +76,27 @@ func TestRoundRobinByWeight(t *testing.T) {
assert.Equal(t, w, selected[i])
}
}

func TestRoundRobinByWeightConcurrent(t *testing.T) {
loadBalance := NewRRLoadBalance()

var invokers []base.Invoker
for i := 1; i <= 5; i++ {
url, _ := common.NewURL(fmt.Sprintf("dubbo://192.168.1.%v:20000/org.apache.demo.HelloService?weight=%v", i, i))
invokers = append(invokers, base.NewBaseInvoker(url))
}

const goroutines = 50
var wg sync.WaitGroup
wg.Add(goroutines)
for i := 0; i < goroutines; i++ {
go func() {
defer wg.Done()
for j := 0; j < 30; j++ {
invoker := loadBalance.Select(invokers, &invocation.RPCInvocation{})
assert.NotNil(t, invoker)
}
}()
}
wg.Wait()
}
31 changes: 30 additions & 1 deletion cluster/router/script/instance/js_instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ type jsInstances struct {

type jsInstance struct {
rt *goja.Runtime
// baseGlobals is the set of global names present in a freshly created
// runtime (built-ins). Any global outside this set is treated as
// request/script state and removed on reset.
baseGlobals map[string]struct{}
}

type program struct {
Expand Down Expand Up @@ -82,6 +86,10 @@ func (i *jsInstances) Run(rawScript string, invokers []base.Invoker, invocation
return invokers, nil
}
matcher := i.insPool.Get().(*jsInstance)
defer func() {
matcher.reset()
i.insPool.Put(matcher)
}()

packInvokers := make([]base.Invoker, 0, len(invokers))
for _, invoker := range invokers {
Expand Down Expand Up @@ -179,9 +187,30 @@ func (j jsInstance) initReplyVar() {
}
}

// reset removes every global added since the runtime was created — the request
// bindings (invokers/invocation/context/result) as well as any globals defined
// by the executed user script — before returning the instance to the pool. This
// prevents cross-request state leakage via pooled goja.Runtime globals. Built-in
// globals captured at construction time are preserved.
func (j jsInstance) reset() {
global := j.rt.GlobalObject()
for _, key := range global.Keys() {
if _, isBase := j.baseGlobals[key]; isBase {
continue
}
_ = global.Delete(key)
}
}

func newJsInstance() *jsInstance {
rt := goja.New()
base := make(map[string]struct{})
for _, key := range rt.GlobalObject().Keys() {
base[key] = struct{}{}
}
return &jsInstance{
rt: goja.New(),
rt: rt,
baseGlobals: base,
}
}

Expand Down
34 changes: 34 additions & 0 deletions cluster/router/script/instance/js_instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,3 +716,37 @@ func TestRunScriptInPanic(t *testing.T) {
`
wontPanic(scriptCallWrongArgs3)
}

func TestJsInstanceResetClearsBindings(t *testing.T) {
inst := newJsInstance()

testify_require.NoError(t, inst.rt.Set("invokers", []string{"test"}))
testify_require.NoError(t, inst.rt.Set("invocation", "test-inv"))
testify_require.NoError(t, inst.rt.Set("context", "test-ctx"))
testify_require.NoError(t, inst.rt.Set(jsScriptResultName, "test-result"))
// Global defined by a user script must also be cleared on reset.
_, err := inst.rt.RunString("userDefinedGlobal = 42;")
testify_require.NoError(t, err)
assert.NotNil(t, inst.rt.Get("userDefinedGlobal"))

inst.reset()

// Get returns nil for undefined globals after reset.
assert.Nil(t, inst.rt.Get("invokers"))
assert.Nil(t, inst.rt.Get("invocation"))
assert.Nil(t, inst.rt.Get("context"))
assert.Nil(t, inst.rt.Get(jsScriptResultName))
assert.Nil(t, inst.rt.Get("userDefinedGlobal"))
}

func TestJsInstanceRunReturnsRuntimeToPool(t *testing.T) {
instances := newJsInstances()
testify_require.NoError(t, instances.Compile(Func_Script))

invokers, inv, _ := getRouteArgs()
for i := 0; i < 5; i++ {
result, err := instances.Run(Func_Script, invokers, inv)
testify_require.NoError(t, err)
assert.NotEmpty(t, result)
}
}
61 changes: 50 additions & 11 deletions filter/tps/limiter/method_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package limiter
import (
"strconv"
"sync"
"sync/atomic"
"time"
)

import (
"github.com/dubbogo/gost/log/logger"

"github.com/modern-go/concurrent"
)

import (
Expand All @@ -38,8 +38,30 @@ import (

const (
name = "method-service"

tpsLimiterStateTTL = 10 * time.Minute
tpsLimiterCleanupInterval = 5 * time.Minute
)

// tpsLimitEntry wraps a TpsLimitStrategy with a last-access timestamp so that
// stale entries (services/methods no longer invoked) can be evicted periodically.
type tpsLimitEntry struct {
strategy filter.TpsLimitStrategy
lastAccess int64 // unix nanoseconds, updated atomically
}

func newTpsLimitEntry(s filter.TpsLimitStrategy) *tpsLimitEntry {
return &tpsLimitEntry{
strategy: s,
lastAccess: time.Now().UnixNano(),
}
}

func (e *tpsLimitEntry) IsAllowable() bool {
atomic.StoreInt64(&e.lastAccess, time.Now().UnixNano())
return e.strategy.IsAllowable()
}

func init() {
extension.SetTpsLimiter(constant.DefaultKey, GetMethodServiceTpsLimiter)
extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter)
Expand Down Expand Up @@ -112,7 +134,7 @@ func init() {
* In this case, only UpdateUser will be limited by its configuration (70 times in 40000ms)
*/
type MethodServiceTpsLimiter struct {
tpsState *concurrent.Map
tpsState sync.Map // map[string]*tpsLimitEntry
}

// IsAllowable based on method-level and service-level.
Expand All @@ -121,7 +143,7 @@ type MethodServiceTpsLimiter struct {
// The key point is how to keep thread-safe
// This implementation use concurrent map + loadOrStore to make implementation thread-safe
// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState
func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation base.Invocation) bool {
func (limiter *MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation base.Invocation) bool {
methodConfigPrefix := "methods." + invocation.MethodName() + "."

methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPSLimitRateKey, "")
Expand All @@ -140,7 +162,7 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation b
limitState, found := limiter.tpsState.Load(limitTarget)
if found {
// the limiter has been cached, we return its result
return limitState.(filter.TpsLimitStrategy).IsAllowable()
return limitState.(*tpsLimitEntry).IsAllowable()
}

// we could not find the limiter, and try to create one.
Expand Down Expand Up @@ -172,10 +194,27 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation b
return true
}

// we using loadOrStore to ensure thread-safe
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval)))
// we using LoadOrStore to ensure thread-safe; wrap in tpsLimitEntry for TTL tracking
entry := newTpsLimitEntry(limitStateCreator.Create(int(limitRate), int(limitInterval)))
actual, _ := limiter.tpsState.LoadOrStore(limitTarget, entry)
return actual.(*tpsLimitEntry).IsAllowable()
}

return limitState.(filter.TpsLimitStrategy).IsAllowable()
// runCleanup periodically evicts limiter entries that have not been accessed
// within tpsLimiterStateTTL. This prevents unbounded accumulation when
// services or methods are removed dynamically.
func (limiter *MethodServiceTpsLimiter) runCleanup() {
ticker := time.NewTicker(tpsLimiterCleanupInterval)
defer ticker.Stop()
for range ticker.C {
cutoff := time.Now().Add(-tpsLimiterStateTTL).UnixNano()
limiter.tpsState.Range(func(key, val any) bool {
if atomic.LoadInt64(&val.(*tpsLimitEntry).lastAccess) < cutoff {
limiter.tpsState.Delete(key)
}
return true
})
}
}

// getLimitConfig will try to fetch the configuration from url.
Expand Down Expand Up @@ -215,9 +254,9 @@ var (
// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiter instance.
func GetMethodServiceTpsLimiter() filter.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() {
methodServiceTpsLimiterInstance = &MethodServiceTpsLimiter{
tpsState: concurrent.NewMap(),
}
inst := &MethodServiceTpsLimiter{}
go inst.runCleanup()
methodServiceTpsLimiterInstance = inst
})
return methodServiceTpsLimiterInstance
}
39 changes: 39 additions & 0 deletions filter/tps/limiter/method_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package limiter

import (
"net/url"
"sync/atomic"
"testing"
"time"
)

import (
Expand Down Expand Up @@ -156,3 +158,40 @@ func (creator *mockStrategyCreator) Create(rate int, interval int) filter.TpsLim
assert.Equal(creator.t, creator.interval, interval)
return creator.strategy
}

type stubAllowStrategy struct{}

func (s *stubAllowStrategy) IsAllowable() bool { return true }

func TestTpsLimitEntryUpdatesLastAccess(t *testing.T) {
entry := newTpsLimitEntry(&stubAllowStrategy{})
before := atomic.LoadInt64(&entry.lastAccess)
time.Sleep(time.Millisecond)
entry.IsAllowable()
after := atomic.LoadInt64(&entry.lastAccess)
assert.Greater(t, after, before)
}

func TestTpsLimiterStaleEntryEviction(t *testing.T) {
limiter := &MethodServiceTpsLimiter{}

staleEntry := newTpsLimitEntry(&stubAllowStrategy{})
atomic.StoreInt64(&staleEntry.lastAccess, time.Now().Add(-tpsLimiterStateTTL-time.Second).UnixNano())
limiter.tpsState.Store("stale-key", staleEntry)

activeEntry := newTpsLimitEntry(&stubAllowStrategy{})
limiter.tpsState.Store("active-key", activeEntry)

cutoff := time.Now().Add(-tpsLimiterStateTTL).UnixNano()
limiter.tpsState.Range(func(key, val any) bool {
if atomic.LoadInt64(&val.(*tpsLimitEntry).lastAccess) < cutoff {
limiter.tpsState.Delete(key)
}
return true
})

_, staleExists := limiter.tpsState.Load("stale-key")
_, activeExists := limiter.tpsState.Load("active-key")
assert.False(t, staleExists, "stale entry should be evicted")
assert.True(t, activeExists, "active entry should remain")
}
Loading
Loading