Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions registry/etcdv3/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,38 @@ import (
)

type dataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
subscribed map[string]config_center.ConfigurationListener
mutex sync.Mutex
closed bool
}

// NewRegistryDataListener creates a data listener for etcd
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener}
func NewRegistryDataListener() *dataListener {
return &dataListener{
subscribed: make(map[string]config_center.ConfigurationListener),
}
}

// SubscribeURL is used to set a watch listener for url
func (l *dataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) {
if l.closed {
return
}
l.subscribed[url.ServiceKey()] = listener
}

// AddInterestedURL adds a registration @url to listen
func (l *dataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
// UnSubscribeURL is used to unset a watch listener for url
func (l *dataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener {
if l.closed {
return nil
}
listener := l.subscribed[url.ServiceKey()]
if listener == nil {
return nil
}
listener.(*configurationListener).Close()
delete(l.subscribed, url.ServiceKey())
return listener
}

// DataChange processes the data change event from registry center of etcd
Expand All @@ -64,33 +84,51 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool {
logger.Warnf("[Registry][Etcdv3] listen NewURL, path=%s err=%v", eventType.Path, err)
return false
}

for _, v := range l.interestedURL {
if serviceURL.URLEqual(v) {
l.listener.Process(
l.mutex.Lock()
defer l.mutex.Unlock()
if l.closed {
return false
}
match := false
for serviceKey, listener := range l.subscribed {
intf, group, version := common.ParseServiceKey(serviceKey)
if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: eventType.Path,
Value: serviceURL,
Value: serviceURL.Clone(),
ConfigType: eventType.Action,
},
)
return true
match = true
}
}
return false
return match
}

// Close all subscribed listeners
func (l *dataListener) Close() {
l.mutex.Lock()
defer l.mutex.Unlock()
l.closed = true
for _, listener := range l.subscribed {
listener.(*configurationListener).Close()
}
}

type configurationListener struct {
registry *etcdV3Registry
events *gxchan.UnboundedChan
isClosed bool
close chan struct{}
closeOnce sync.Once
}

// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)}
return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32), close: make(chan struct{}, 1)}
}

// Process data change event from config center of etcd
Expand All @@ -102,6 +140,9 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
case <-l.close:
logger.Warn("[Registry][Etcdv3] listener has been closed")
return nil, perrors.New("listener has been closed")
case <-l.registry.Done():
logger.Warn("[Registry][Etcdv3] listener's etcd client connection is broken, so etcd event listener exit now")
return nil, perrors.New("listener stopped")
Expand All @@ -122,9 +163,12 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}

// Close etcd registry center
// Close closes the listener only once
func (l *configurationListener) Close() {
// ensure that the listener will be closed at most once.
l.closeOnce.Do(func() {
l.isClosed = true
l.close <- struct{}{}
l.registry.WaitGroup().Done()
})
}
248 changes: 217 additions & 31 deletions registry/etcdv3/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,43 +18,229 @@
package etcdv3

import (
"testing"
"time"
)

import (
"github.com/stretchr/testify/assert"
)

import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
"dubbo.apache.org/dubbo-go/v3/remoting"
)

type MockDataListener struct{}
// mockConfigListener implements config_center.ConfigurationListener for testing
type mockConfigListener struct {
events []*config_center.ConfigChangeEvent
}

func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {}
func (m *mockConfigListener) Process(configType *config_center.ConfigChangeEvent) {
m.events = append(m.events, configType)
}

/*
func Test_dataListener_DataChange(t *testing.T) {
tests := []struct {
name string
fields dataListenerFields
args args
want bool
}{
{
name: "test",
fields: dataListenerFields{
interestedURL: nil,
listener: &MockDataListener{},
},
args: args{
eventType: remoting.Event{
Path: "com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100",
},
},
want: false,
},
// ---------- dataListener tests ----------

func TestNewRegistryDataListener(t *testing.T) {
dl := NewRegistryDataListener()
assert.NotNil(t, dl)
assert.NotNil(t, dl.subscribed)
assert.False(t, dl.closed)
}

func TestDataListenerSubscribeURL(t *testing.T) {
dl := NewRegistryDataListener()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")
listener := &mockConfigListener{}

dl.SubscribeURL(url, listener)
assert.Equal(t, listener, dl.subscribed[url.ServiceKey()])
assert.NotContains(t, dl.subscribed, "nonexistent")
}

func TestDataListenerSubscribeURLAfterClose(t *testing.T) {
dl := NewRegistryDataListener()
dl.closed = true
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")

dl.SubscribeURL(url, &mockConfigListener{})
assert.Empty(t, dl.subscribed)
}

func TestDataListenerUnSubscribeURL(t *testing.T) {
reg := &etcdV3Registry{}
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")
dl := NewRegistryDataListener()

cl := NewConfigurationListener(reg)
dl.SubscribeURL(url, cl)

returned := dl.UnSubscribeURL(url)
assert.NotNil(t, returned)
assert.True(t, cl.isClosed)
assert.NotContains(t, dl.subscribed, url.ServiceKey())
}

func TestDataListenerUnSubscribeURLNotSubscribed(t *testing.T) {
dl := NewRegistryDataListener()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")

returned := dl.UnSubscribeURL(url)
assert.Nil(t, returned)
}

func TestDataListenerDataChangeMatch(t *testing.T) {
dl := NewRegistryDataListener()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")
listener := &mockConfigListener{}
dl.SubscribeURL(url, listener)

event := remoting.Event{
Path: "/dubbo/com.example.Service/providers/dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service",
Action: remoting.EventTypeAdd,
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := newDataListener(tt.fields)
if got := l.DataChange(tt.args.eventType); got != tt.want {
t.Errorf("DataChange() = %v, want %v", got, tt.want)
}
})

got := dl.DataChange(event)
assert.True(t, got)
assert.Len(t, listener.events, 1)
assert.Equal(t, event.Action, listener.events[0].ConfigType)
}

func TestDataListenerDataChangeNoProviderPath(t *testing.T) {
dl := NewRegistryDataListener()

event := remoting.Event{
Path: "/dubbo/com.example.Service/consumers/some-url",
Action: remoting.EventTypeAdd,
}

got := dl.DataChange(event)
assert.False(t, got)
}

*/
func TestDataListenerDataChangeNoMatch(t *testing.T) {
dl := NewRegistryDataListener()
url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service1?interface=com.example.Service1")
listener := &mockConfigListener{}
dl.SubscribeURL(url, listener)

// event for a different service
event := remoting.Event{
Path: "/dubbo/com.example.Service2/providers/dubbo://127.0.0.1:20000/com.example.Service2?interface=com.example.Service2",
Action: remoting.EventTypeAdd,
}

got := dl.DataChange(event)
assert.False(t, got)
assert.Empty(t, listener.events)
}

func TestDataListenerDataChangeAfterClose(t *testing.T) {
dl := NewRegistryDataListener()
dl.closed = true

event := remoting.Event{
Path: "/dubbo/com.example.Service/providers/dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service",
Action: remoting.EventTypeAdd,
}

got := dl.DataChange(event)
assert.False(t, got)
}

func TestDataListenerClose(t *testing.T) {
reg := &etcdV3Registry{}
dl := NewRegistryDataListener()
url1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service1?interface=com.example.Service1")
url2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service2?interface=com.example.Service2")

c1 := NewConfigurationListener(reg)
c2 := NewConfigurationListener(reg)
dl.SubscribeURL(url1, c1)
dl.SubscribeURL(url2, c2)
assert.False(t, dl.closed)

dl.Close()
assert.True(t, dl.closed)
assert.True(t, c1.isClosed)
assert.True(t, c2.isClosed)
}

func TestDataListenerCloseEmpty(t *testing.T) {
dl := NewRegistryDataListener()
dl.Close()
assert.True(t, dl.closed)
}

// ---------- configurationListener tests ----------

func TestNewConfigurationListener(t *testing.T) {
reg := &etcdV3Registry{}
cl := NewConfigurationListener(reg)

assert.NotNil(t, cl)
assert.False(t, cl.isClosed)
assert.NotNil(t, cl.close)
assert.NotNil(t, cl.events)
}

func TestConfigurationListenerProcessAndNext(t *testing.T) {
reg := &etcdV3Registry{}
cl := NewConfigurationListener(reg)

serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service")
event := &config_center.ConfigChangeEvent{
Key: "test-key",
Value: serviceURL,
ConfigType: remoting.EventTypeAdd,
}

cl.Process(event)

done := make(chan struct{})
var result *testResult
go func() {
r, err := cl.Next()
result = &testResult{serviceEvent: r, err: err}
close(done)
}()

select {
case <-done:
assert.NoError(t, result.err)
assert.NotNil(t, result.serviceEvent)
assert.Equal(t, remoting.EventTypeAdd, result.serviceEvent.Action)
case <-time.After(time.Second):
t.Fatal("timeout waiting for Next()")
}
}

type testResult struct {
serviceEvent *registry.ServiceEvent
err error
}

func TestConfigurationListenerClose(t *testing.T) {
reg := &etcdV3Registry{}
cl := NewConfigurationListener(reg)

assert.False(t, cl.isClosed)
cl.Close()
assert.True(t, cl.isClosed)

// calling Close twice should not panic (closeOnce)
cl.Close()
}

func TestConfigurationListenerNextReturnsErrorAfterClose(t *testing.T) {
reg := &etcdV3Registry{}
cl := NewConfigurationListener(reg)

cl.Close()
_, err := cl.Next()
assert.Error(t, err)
assert.Contains(t, err.Error(), "listener has been closed")
}
Loading
Loading