Skip to content
42 changes: 26 additions & 16 deletions registry/servicediscovery/service_discovery_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,12 @@ func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registr
return nil
}
serviceNamesKey := sortServices(services)
l := s.serviceListeners[serviceNamesKey]
var l registry.ServiceInstancesChangedListener
func() {
s.lock.RLock()
defer s.lock.RUnlock()
l = s.serviceListeners[serviceNamesKey]
}()
if l != nil {
l.RemoveListener(url.ServiceKey())
}
Expand Down Expand Up @@ -532,23 +537,28 @@ func (s *serviceDiscoveryRegistry) SubscribeURL(url *common.URL, notify registry
protocol = url.Protocol
}
protocolServiceKey := url.ServiceKey() + ":" + protocol
listener := s.serviceListeners[serviceNamesKey]
if listener == nil {
listener = NewServiceInstancesChangedListener(url.GetParam(constant.ApplicationKey, ""), s.url.GetParam(constant.RegistryIdKey, constant.DefaultKey), services)
for _, serviceNameTmp := range services.Values() {
serviceName := serviceNameTmp.(string)
instances := s.serviceDiscovery.GetInstances(serviceName)
logger.Infof("[Registry][ServiceDiscovery] synchronized instance notification on application %s subscription, instance list size %s", serviceName, len(instances))
err = listener.OnEvent(&registry.ServiceInstancesChangedEvent{
ServiceName: serviceName,
Instances: instances,
})
if err != nil {
logger.Warnf("[Registry][ServiceDiscovery] ServiceInstancesChangedListenerImpl handle error, err=%v", err)
var listener registry.ServiceInstancesChangedListener
func() {
s.lock.Lock()
defer s.lock.Unlock()
listener = s.serviceListeners[serviceNamesKey]
if listener == nil {
listener = NewServiceInstancesChangedListener(url.GetParam(constant.ApplicationKey, ""), s.url.GetParam(constant.RegistryIdKey, constant.DefaultKey), services)
for _, serviceNameTmp := range services.Values() {
serviceName := serviceNameTmp.(string)
instances := s.serviceDiscovery.GetInstances(serviceName)
logger.Infof("[Registry][ServiceDiscovery] synchronized instance notification on application %s subscription, instance list size %s", serviceName, len(instances))
err = listener.OnEvent(&registry.ServiceInstancesChangedEvent{
ServiceName: serviceName,
Instances: instances,
})
if err != nil {
logger.Warnf("[Registry][ServiceDiscovery] ServiceInstancesChangedListenerImpl handle error, err=%v", err)
}
}
}
}
s.serviceListeners[serviceNamesKey] = listener
s.serviceListeners[serviceNamesKey] = listener
}()
listener.AddListenerAndNotify(protocolServiceKey, notify)
event := metricsMetadata.NewMetadataMetricTimeEvent(metricsMetadata.SubscribeServiceRt)

Expand Down
Loading