diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index ab5f8cbf93..865abff877 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -37,18 +37,38 @@ import ( ) type dataListener struct { - interestedURL []*common.URL - listener config_center.ConfigurationListener + subscribed map[string]config_center.ConfigurationListener + mutex sync.Mutex + closed bool } // NewRegistryDataListener creates a data listener for etcd -func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener { - return &dataListener{listener: listener} +func NewRegistryDataListener() *dataListener { + return &dataListener{ + subscribed: make(map[string]config_center.ConfigurationListener), + } +} + +// SubscribeURL is used to set a watch listener for url +func (l *dataListener) SubscribeURL(url *common.URL, listener config_center.ConfigurationListener) { + if l.closed { + return + } + l.subscribed[url.ServiceKey()] = listener } -// AddInterestedURL adds a registration @url to listen -func (l *dataListener) AddInterestedURL(url *common.URL) { - l.interestedURL = append(l.interestedURL, url) +// UnSubscribeURL is used to unset a watch listener for url +func (l *dataListener) UnSubscribeURL(url *common.URL) config_center.ConfigurationListener { + if l.closed { + return nil + } + listener := l.subscribed[url.ServiceKey()] + if listener == nil { + return nil + } + listener.(*configurationListener).Close() + delete(l.subscribed, url.ServiceKey()) + return listener } // DataChange processes the data change event from registry center of etcd @@ -64,25 +84,43 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { logger.Warnf("[Registry][Etcdv3] listen NewURL, path=%s err=%v", eventType.Path, err) return false } - - for _, v := range l.interestedURL { - if serviceURL.URLEqual(v) { - l.listener.Process( + l.mutex.Lock() + defer l.mutex.Unlock() + if l.closed { + return false + } + match := false + for serviceKey, listener := range l.subscribed { + intf, group, version := common.ParseServiceKey(serviceKey) + if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) { + listener.Process( &config_center.ConfigChangeEvent{ Key: eventType.Path, - Value: serviceURL, + Value: serviceURL.Clone(), ConfigType: eventType.Action, }, ) - return true + match = true } } - return false + return match +} + +// Close all subscribed listeners +func (l *dataListener) Close() { + l.mutex.Lock() + defer l.mutex.Unlock() + l.closed = true + for _, listener := range l.subscribed { + listener.(*configurationListener).Close() + } } type configurationListener struct { registry *etcdV3Registry events *gxchan.UnboundedChan + isClosed bool + close chan struct{} closeOnce sync.Once } @@ -90,7 +128,7 @@ type configurationListener struct { func NewConfigurationListener(reg *etcdV3Registry) *configurationListener { // add a new waiter reg.WaitGroup().Add(1) - return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32)} + return &configurationListener{registry: reg, events: gxchan.NewUnboundedChan(32), close: make(chan struct{}, 1)} } // Process data change event from config center of etcd @@ -102,6 +140,9 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv func (l *configurationListener) Next() (*registry.ServiceEvent, error) { for { select { + case <-l.close: + logger.Warn("[Registry][Etcdv3] listener has been closed") + return nil, perrors.New("listener has been closed") case <-l.registry.Done(): logger.Warn("[Registry][Etcdv3] listener's etcd client connection is broken, so etcd event listener exit now") return nil, perrors.New("listener stopped") @@ -122,9 +163,12 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { } } -// Close etcd registry center +// Close closes the listener only once func (l *configurationListener) Close() { + // ensure that the listener will be closed at most once. l.closeOnce.Do(func() { + l.isClosed = true + l.close <- struct{}{} l.registry.WaitGroup().Done() }) } diff --git a/registry/etcdv3/listener_test.go b/registry/etcdv3/listener_test.go index 4a7223ba40..94dfeefce5 100644 --- a/registry/etcdv3/listener_test.go +++ b/registry/etcdv3/listener_test.go @@ -18,43 +18,229 @@ package etcdv3 import ( + "testing" + "time" +) + +import ( + "github.com/stretchr/testify/assert" +) + +import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/config_center" + "dubbo.apache.org/dubbo-go/v3/registry" + "dubbo.apache.org/dubbo-go/v3/remoting" ) -type MockDataListener struct{} +// mockConfigListener implements config_center.ConfigurationListener for testing +type mockConfigListener struct { + events []*config_center.ConfigChangeEvent +} -func (*MockDataListener) Process(configType *config_center.ConfigChangeEvent) {} +func (m *mockConfigListener) Process(configType *config_center.ConfigChangeEvent) { + m.events = append(m.events, configType) +} -/* -func Test_dataListener_DataChange(t *testing.T) { - tests := []struct { - name string - fields dataListenerFields - args args - want bool - }{ - { - name: "test", - fields: dataListenerFields{ - interestedURL: nil, - listener: &MockDataListener{}, - }, - args: args{ - eventType: remoting.Event{ - Path: "com.ikurento.user.UserProvider/providers/jsonrpc%3A%2F%2F127.0.0.1%3A20001%2Fcom.ikurento.user.UserProvider%3Fanyhost%3Dtrue%26app.version%3D0.0.1%26application%3DBDTService%26category%3Dproviders%26cluster%3Dfailover%26dubbo%3Ddubbo-provider-golang-2.6.0%26environment%3Ddev%26group%3D%26interface%3Dcom.ikurento.user.UserProvider%26ip%3D10.32.20.124%26loadbalance%3Drandom%26methods.GetUser.loadbalance%3Drandom%26methods.GetUser.retries%3D1%26methods.GetUser.weight%3D0%26module%3Ddubbogo%2Buser-info%2Bserver%26name%3DBDTService%26organization%3Dikurento.com%26owner%3DZX%26pid%3D74500%26retries%3D0%26service.filter%3Decho%26side%3Dprovider%26timestamp%3D1560155407%26version%3D%26warmup%3D100", - }, - }, - want: false, - }, +// ---------- dataListener tests ---------- + +func TestNewRegistryDataListener(t *testing.T) { + dl := NewRegistryDataListener() + assert.NotNil(t, dl) + assert.NotNil(t, dl.subscribed) + assert.False(t, dl.closed) +} + +func TestDataListenerSubscribeURL(t *testing.T) { + dl := NewRegistryDataListener() + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + listener := &mockConfigListener{} + + dl.SubscribeURL(url, listener) + assert.Equal(t, listener, dl.subscribed[url.ServiceKey()]) + assert.NotContains(t, dl.subscribed, "nonexistent") +} + +func TestDataListenerSubscribeURLAfterClose(t *testing.T) { + dl := NewRegistryDataListener() + dl.closed = true + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + dl.SubscribeURL(url, &mockConfigListener{}) + assert.Empty(t, dl.subscribed) +} + +func TestDataListenerUnSubscribeURL(t *testing.T) { + reg := &etcdV3Registry{} + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + dl := NewRegistryDataListener() + + cl := NewConfigurationListener(reg) + dl.SubscribeURL(url, cl) + + returned := dl.UnSubscribeURL(url) + assert.NotNil(t, returned) + assert.True(t, cl.isClosed) + assert.NotContains(t, dl.subscribed, url.ServiceKey()) +} + +func TestDataListenerUnSubscribeURLNotSubscribed(t *testing.T) { + dl := NewRegistryDataListener() + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + returned := dl.UnSubscribeURL(url) + assert.Nil(t, returned) +} + +func TestDataListenerDataChangeMatch(t *testing.T) { + dl := NewRegistryDataListener() + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + listener := &mockConfigListener{} + dl.SubscribeURL(url, listener) + + event := remoting.Event{ + Path: "/dubbo/com.example.Service/providers/dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service", + Action: remoting.EventTypeAdd, } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - l := newDataListener(tt.fields) - if got := l.DataChange(tt.args.eventType); got != tt.want { - t.Errorf("DataChange() = %v, want %v", got, tt.want) - } - }) + + got := dl.DataChange(event) + assert.True(t, got) + assert.Len(t, listener.events, 1) + assert.Equal(t, event.Action, listener.events[0].ConfigType) +} + +func TestDataListenerDataChangeNoProviderPath(t *testing.T) { + dl := NewRegistryDataListener() + + event := remoting.Event{ + Path: "/dubbo/com.example.Service/consumers/some-url", + Action: remoting.EventTypeAdd, } + + got := dl.DataChange(event) + assert.False(t, got) } -*/ +func TestDataListenerDataChangeNoMatch(t *testing.T) { + dl := NewRegistryDataListener() + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service1?interface=com.example.Service1") + listener := &mockConfigListener{} + dl.SubscribeURL(url, listener) + + // event for a different service + event := remoting.Event{ + Path: "/dubbo/com.example.Service2/providers/dubbo://127.0.0.1:20000/com.example.Service2?interface=com.example.Service2", + Action: remoting.EventTypeAdd, + } + + got := dl.DataChange(event) + assert.False(t, got) + assert.Empty(t, listener.events) +} + +func TestDataListenerDataChangeAfterClose(t *testing.T) { + dl := NewRegistryDataListener() + dl.closed = true + + event := remoting.Event{ + Path: "/dubbo/com.example.Service/providers/dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service", + Action: remoting.EventTypeAdd, + } + + got := dl.DataChange(event) + assert.False(t, got) +} + +func TestDataListenerClose(t *testing.T) { + reg := &etcdV3Registry{} + dl := NewRegistryDataListener() + url1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service1?interface=com.example.Service1") + url2, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service2?interface=com.example.Service2") + + c1 := NewConfigurationListener(reg) + c2 := NewConfigurationListener(reg) + dl.SubscribeURL(url1, c1) + dl.SubscribeURL(url2, c2) + assert.False(t, dl.closed) + + dl.Close() + assert.True(t, dl.closed) + assert.True(t, c1.isClosed) + assert.True(t, c2.isClosed) +} + +func TestDataListenerCloseEmpty(t *testing.T) { + dl := NewRegistryDataListener() + dl.Close() + assert.True(t, dl.closed) +} + +// ---------- configurationListener tests ---------- + +func TestNewConfigurationListener(t *testing.T) { + reg := &etcdV3Registry{} + cl := NewConfigurationListener(reg) + + assert.NotNil(t, cl) + assert.False(t, cl.isClosed) + assert.NotNil(t, cl.close) + assert.NotNil(t, cl.events) +} + +func TestConfigurationListenerProcessAndNext(t *testing.T) { + reg := &etcdV3Registry{} + cl := NewConfigurationListener(reg) + + serviceURL, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + event := &config_center.ConfigChangeEvent{ + Key: "test-key", + Value: serviceURL, + ConfigType: remoting.EventTypeAdd, + } + + cl.Process(event) + + done := make(chan struct{}) + var result *testResult + go func() { + r, err := cl.Next() + result = &testResult{serviceEvent: r, err: err} + close(done) + }() + + select { + case <-done: + assert.NoError(t, result.err) + assert.NotNil(t, result.serviceEvent) + assert.Equal(t, remoting.EventTypeAdd, result.serviceEvent.Action) + case <-time.After(time.Second): + t.Fatal("timeout waiting for Next()") + } +} + +type testResult struct { + serviceEvent *registry.ServiceEvent + err error +} + +func TestConfigurationListenerClose(t *testing.T) { + reg := &etcdV3Registry{} + cl := NewConfigurationListener(reg) + + assert.False(t, cl.isClosed) + cl.Close() + assert.True(t, cl.isClosed) + + // calling Close twice should not panic (closeOnce) + cl.Close() +} + +func TestConfigurationListenerNextReturnsErrorAfterClose(t *testing.T) { + reg := &etcdV3Registry{} + cl := NewConfigurationListener(reg) + + cl.Close() + _, err := cl.Next() + assert.Error(t, err) + assert.Contains(t, err.Error(), "listener has been closed") +} diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index eaa93c6269..74c18939c3 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -49,12 +49,11 @@ func init() { type etcdV3Registry struct { registry.BaseRegistry - cltLock sync.Mutex - client *gxetcd.Client - listenerLock sync.RWMutex - listener *etcdv3.EventListener - dataListener *dataListener - configListener *configurationListener + cltLock sync.Mutex + client *gxetcd.Client + listenerLock sync.RWMutex + listener *etcdv3.EventListener + dataListener *dataListener } // Client gets the etcdv3 client @@ -99,8 +98,7 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) { // InitListeners init listeners of etcd registry center func (r *etcdV3Registry) InitListeners() { r.listener = etcdv3.NewEventListener(r.client) - r.configListener = NewConfigurationListener(r) - r.dataListener = NewRegistryDataListener(r.configListener) + r.dataListener = NewRegistryDataListener() } // DoRegister actually do the register job in the registry center of etcd @@ -109,9 +107,14 @@ func (r *etcdV3Registry) DoRegister(root string, node string) error { return r.client.RegisterTemp(path.Join(root, node), "") } -// DoUnregister is not supported in etcdV3Registry. +// DoUnregister actually unregister the node from the registry center of etcd. func (r *etcdV3Registry) DoUnregister(root string, node string) error { - return perrors.New("DoUnregister is not support in etcdV3Registry") + r.cltLock.Lock() + defer r.cltLock.Unlock() + if !r.client.Valid() { + return perrors.Errorf("etcd client is not valid.") + } + return r.client.Delete(path.Join(root, node)) } // CloseAndNilClient closes listeners and clear client @@ -122,8 +125,8 @@ func (r *etcdV3Registry) CloseAndNilClient() { // CloseListener closes listeners func (r *etcdV3Registry) CloseListener() { - if r.configListener != nil { - r.configListener.Close() + if r.dataListener != nil { + r.dataListener.Close() } } @@ -141,10 +144,32 @@ func (r *etcdV3Registry) CreatePath(k string) error { } // DoSubscribe actually subscribe the provider URL -func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { - r.listenerLock.RLock() - configListener := r.configListener - r.listenerLock.RUnlock() +func (r *etcdV3Registry) DoSubscribe(conf *common.URL) (registry.Listener, error) { + return r.getListener(conf) +} + +func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { + return r.getCloseListener(conf) +} + +func (r *etcdV3Registry) getListener(conf *common.URL) (*configurationListener, error) { + var etcdListener *configurationListener + dataListener := r.dataListener + + dataListener.mutex.Lock() + defer dataListener.mutex.Unlock() + + if dataListener.subscribed[conf.ServiceKey()] != nil { + etcdListener, _ = r.dataListener.subscribed[conf.ServiceKey()].(*configurationListener) + if etcdListener != nil { + if etcdListener.isClosed { + return nil, perrors.New("configListener already been closed") + } + return etcdListener, nil + } + } + + etcdListener = NewConfigurationListener(r) if r.listener == nil { r.cltLock.Lock() client := r.client @@ -153,19 +178,46 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) return nil, perrors.New("etcd client broken") } r.listenerLock.Lock() - r.listener = etcdv3.NewEventListener(r.client) // new client & listener + r.listener = etcdv3.NewEventListener(r.client) r.listenerLock.Unlock() } - // register the svc to dataListener - r.dataListener.AddInterestedURL(svc) - go r.listener.ListenServiceEvent(fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, svc.Service()), r.dataListener) + r.dataListener.SubscribeURL(conf, etcdListener) + go r.listener.ListenServiceEvent( + fmt.Sprintf("/dubbo/%s/"+constant.DefaultCategory, conf.Service()), + r.dataListener, + ) - return configListener, nil + return etcdListener, nil } -func (r *etcdV3Registry) DoUnsubscribe(conf *common.URL) (registry.Listener, error) { - return nil, perrors.New("DoUnsubscribe is not support in etcdV3Registry") +func (r *etcdV3Registry) getCloseListener(conf *common.URL) (*configurationListener, error) { + var etcdListener *configurationListener + r.dataListener.mutex.Lock() + configListener := r.dataListener.subscribed[conf.ServiceKey()] + if configListener != nil { + etcdListener, _ = configListener.(*configurationListener) + if etcdListener != nil && etcdListener.isClosed { + r.dataListener.mutex.Unlock() + return nil, perrors.New(fmt.Sprintf("configListener for service %s has already been closed", conf.ServiceKey())) + } + } + + if configListener = r.dataListener.UnSubscribeURL(conf); configListener != nil { + switch v := configListener.(type) { + case *configurationListener: + if v != nil { + etcdListener = v + } + } + } + r.dataListener.mutex.Unlock() + + if r.listener == nil { + return nil, perrors.New("etcd event listener is null, can not close.") + } + + return etcdListener, nil } // LoadSubscribeInstances load subscribe instance diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go index 02c718f323..2a2e35d866 100644 --- a/registry/etcdv3/registry_test.go +++ b/registry/etcdv3/registry_test.go @@ -15,108 +15,244 @@ * limitations under the License. */ -// Package etcdv3 contains tests for etcdv3 registry components. package etcdv3 -/* import ( - "reflect" - "sync" + "path" "testing" + "time" ) import ( - "github.com/agiledragon/gomonkey" - - gxetcd "github.com/dubbogo/gost/database/kv/etcd/v3" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) import ( + "dubbo.apache.org/dubbo-go/v3/common" "dubbo.apache.org/dubbo-go/v3/registry" "dubbo.apache.org/dubbo-go/v3/remoting" - "dubbo.apache.org/dubbo-go/v3/remoting/etcdv3" ) -type fields struct { - BaseRegistry registry.BaseRegistry - cltLock sync.Mutex - client *gxetcd.Client - listenerLock sync.RWMutex - listener *etcdv3.EventListener - dataListener *dataListener - configListener *configurationListener +func newTestEtcdV3Registry(t *testing.T) *etcdV3Registry { + url, _ := common.NewURL("dubbo://127.0.0.1:2379") + reg, err := newETCDV3Registry(url) + require.NoError(t, err) + r := reg.(*etcdV3Registry) + t.Cleanup(func() { r.Destroy() }) + return r } -type args struct { - root string - node string - eventType remoting.Event + +// ---------- DoRegister tests ---------- + +func TestDoRegister(t *testing.T) { + r := newTestEtcdV3Registry(t) + + root := "/dubbo/com.example.Service/providers" + node := "dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.example.Service%3Finterface%3Dcom.example.Service" + key := path.Join(root, node) + defer r.client.Delete(key) + + err := r.DoRegister(root, node) + require.NoError(t, err) + + // Verify the key was created in etcd with a lease + val, err := r.client.Get(key) + assert.NoError(t, err) + assert.Empty(t, val) } -func newEtcdV3Registry(f fields) *etcdV3Registry { - return &etcdV3Registry{ - client: f.client, - listener: f.listener, - dataListener: f.dataListener, - configListener: f.configListener, - } +// ---------- DoUnregister tests ---------- + +func TestDoUnregister(t *testing.T) { + r := newTestEtcdV3Registry(t) + + root := "/dubbo/com.example.Service/providers" + node := "test-node" + key := path.Join(root, node) + + // Pre-create the key via the etcd client + err := r.client.Put(key, "") + require.NoError(t, err) + + // Verify it exists first + _, err = r.client.Get(key) + require.NoError(t, err) + + err = r.DoUnregister(root, node) + require.NoError(t, err) + + // Verify the key was deleted from etcd (Get now returns error) + _, err = r.client.Get(key) + assert.Error(t, err) } -func Test_etcdV3Registry_DoRegister(t *testing.T) { - var client *gxetcd.Client - patches := gomonkey.NewPatches() - patches = patches.ApplyMethod(reflect.TypeOf(client), "RegisterTemp", func(_ *gxetcd.Client, k, v string) error { - return nil - }) - defer patches.Reset() - - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - fields: fields{ - client: client, - }, - args: args{ - root: "/dubbo", - node: "/go", - }, - wantErr: false, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := newEtcdV3Registry(tt.fields) - if err := r.DoRegister(tt.args.root, tt.args.node); (err != nil) != tt.wantErr { - t.Errorf("DoRegister() error = %v, wantErr %v", err, tt.wantErr) - } - }) +// ---------- DoSubscribe tests ---------- + +func TestDoSubscribe(t *testing.T) { + r := newTestEtcdV3Registry(t) + + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + // Put a provider key before subscribing so ListenServiceEvent's initial + // GetChildren finds it and fires a DataChange event. + providerKey := "/dubbo/com.example.Service/providers/dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.example.Service%3Finterface%3Dcom.example.Service" + err := r.client.Put(providerKey, "") + require.NoError(t, err) + defer r.client.Delete(providerKey) + + listener, err := r.DoSubscribe(conf) + require.NoError(t, err) + require.NotNil(t, listener) + + // The GetChildren call inside ListenServiceEvent should fire DataChange + // for the existing provider key, which propagates to listener.Next(). + done := make(chan struct{}) + var se *registry.ServiceEvent + go func() { + se, err = listener.Next() + close(done) + }() + + select { + case <-done: + assert.NoError(t, err) + assert.NotNil(t, se) + assert.Equal(t, remoting.EventTypeAdd, se.Action) + case <-time.After(5 * time.Second): + t.Fatal("timeout waiting for subscribe event") } } -func Test_etcdV3Registry_DoUnregister(t *testing.T) { - tests := []struct { - name string - fields fields - args args - wantErr bool - }{ - { - name: "test", - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - r := newEtcdV3Registry(tt.fields) - if err := r.DoUnregister(tt.args.root, tt.args.node); (err != nil) != tt.wantErr { - t.Errorf("DoUnregister() error = %v, wantErr %v", err, tt.wantErr) - } - }) - } +func TestDoSubscribeReusesExistingListener(t *testing.T) { + r := newTestEtcdV3Registry(t) + + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + providerKey := "/dubbo/com.example.Service/providers/dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.example.Service%3Finterface%3Dcom.example.Service" + err := r.client.Put(providerKey, "") + require.NoError(t, err) + defer r.client.Delete(providerKey) + + listener1, err := r.DoSubscribe(conf) + require.NoError(t, err) + + // Consume the initial GetChildren event + done := make(chan struct{}) + go func() { + listener1.Next() + close(done) + }() + <-done + + // Second subscribe should return the same listener + listener2, err := r.DoSubscribe(conf) + assert.NoError(t, err) + assert.Equal(t, listener1, listener2) +} + +func TestDoSubscribeExistingListenerClosed(t *testing.T) { + r := newTestEtcdV3Registry(t) + + cl := NewConfigurationListener(r) + cl.Close() + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + r.dataListener.SubscribeURL(conf, cl) + + _, err := r.DoSubscribe(conf) + assert.Error(t, err) + assert.Contains(t, err.Error(), "configListener already been closed") } -*/ +// ---------- DoUnsubscribe tests ---------- + +func TestDoUnsubscribe(t *testing.T) { + r := newTestEtcdV3Registry(t) + + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + providerKey := "/dubbo/com.example.Service/providers/dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.example.Service%3Finterface%3Dcom.example.Service" + err := r.client.Put(providerKey, "") + require.NoError(t, err) + defer r.client.Delete(providerKey) + + _, err = r.DoSubscribe(conf) + require.NoError(t, err) + + cl, err := r.DoUnsubscribe(conf) + assert.NoError(t, err) + assert.NotNil(t, cl) + assert.True(t, cl.(*configurationListener).isClosed) + + // Verify removed from dataListener + assert.NotContains(t, r.dataListener.subscribed, conf.ServiceKey()) +} + +func TestDoUnsubscribeNotSubscribed(t *testing.T) { + r := newTestEtcdV3Registry(t) + + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + listener, err := r.DoUnsubscribe(conf) + assert.NoError(t, err) + assert.Nil(t, listener) +} + +func TestDoUnsubscribeAlreadyClosed(t *testing.T) { + r := newTestEtcdV3Registry(t) + + conf, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + + providerKey := "/dubbo/com.example.Service/providers/dubbo%3A%2F%2F127.0.0.1%3A20000%2Fcom.example.Service%3Finterface%3Dcom.example.Service" + err := r.client.Put(providerKey, "") + require.NoError(t, err) + defer r.client.Delete(providerKey) + + _, err = r.DoSubscribe(conf) + require.NoError(t, err) + + // Manually close the listener + cl, ok := r.dataListener.subscribed[conf.ServiceKey()].(*configurationListener) + require.True(t, ok) + cl.Close() + + // Unsubscribe should error because the existing listener is already closed + _, err = r.DoUnsubscribe(conf) + assert.Error(t, err) + assert.Contains(t, err.Error(), "configListener for service com.example.Service has already been closed") +} + +// ---------- InitListeners tests ---------- + +func TestInitListeners(t *testing.T) { + r := newTestEtcdV3Registry(t) + + assert.NotNil(t, r.listener) + assert.NotNil(t, r.dataListener) + assert.NotNil(t, r.dataListener.subscribed) +} + +// ---------- CloseListener tests ---------- + +func TestCloseListener(t *testing.T) { + r := newTestEtcdV3Registry(t) + + cl := NewConfigurationListener(r) + url, _ := common.NewURL("dubbo://127.0.0.1:20000/com.example.Service?interface=com.example.Service") + r.dataListener.SubscribeURL(url, cl) + + r.CloseListener() + assert.True(t, cl.isClosed) +} + +func TestCloseListenerNil(t *testing.T) { + r := &etcdV3Registry{} + // dataListener is nil, should not panic + r.CloseListener() +} + +// ---------- Registry interface conformance ---------- + +func TestRegistryInterface(t *testing.T) { + var _ registry.Registry = (*etcdV3Registry)(nil) +}