Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 66 additions & 11 deletions metadata/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,45 @@ const defaultTimeout = "5s" // s
func GetMetadataFromMetadataReport(revision string, instance registry.ServiceInstance, registryId string) (*info.MetadataInfo, error) {
report := GetMetadataReportByRegistry(registryId)
if report == nil {
return nil, perrors.Errorf("no metadata report instance found for registryId=%s, please check metadata-report configuration", registryId)
return nil, &MetadataError{
Kind: MetadataErrorKindReportLoad,
Source: "metadata_report",
App: instance.GetServiceName(),
Revision: revision,
RegistryID: registryId,
Err: perrors.Errorf("no metadata report instance found for registryId=%s, please check metadata-report configuration", registryId),
}
}
meta, err := report.GetAppMetadata(instance.GetServiceName(), revision)
if err != nil {
return nil, perrors.Wrapf(err, "failed to get app metadata app=%s revision=%s", instance.GetServiceName(), revision)
return nil, &MetadataError{
Kind: MetadataErrorKindReportLoad,
Source: "metadata_report",
App: instance.GetServiceName(),
Revision: revision,
RegistryID: registryId,
Err: perrors.Wrapf(err, "failed to get app metadata app=%s revision=%s", instance.GetServiceName(), revision),
}
}
return meta, nil
}

func GetMetadataFromRpc(revision string, instance registry.ServiceInstance) (*info.MetadataInfo, error) {
url, err := buildStandardMetadataServiceURL(instance)
if err != nil {
return nil, err
return nil, withMetadataErrorContext(err, MetadataErrorKindURLBuild, "metadata_url", instance.GetServiceName(), revision, metadataStorageType(instance))
}
url.SetParam(constant.TimeoutKey, defaultTimeout)
p := extension.GetProtocol(url.Protocol)
invoker := p.Refer(url)
if invoker == nil { // can't connect instance
return nil, perrors.New("can not connect to remote metadata service host: " + url.Ip)
return nil, &MetadataError{
Kind: MetadataErrorKindRPCLoad,
Source: "rpc_metadata",
App: instance.GetServiceName(),
Revision: revision,
Err: perrors.New("can not connect to remote metadata service host: " + url.Ip),
}
}
var remoteService remoteMetadataService
if url.Protocol == constant.TriProtocol && instance.GetMetadata()[constant.MetadataVersion] == constant.MetadataServiceV2Version {
Expand All @@ -74,7 +94,18 @@ func GetMetadataFromRpc(revision string, instance registry.ServiceInstance) (*in
defer func() {
invoker.Destroy()
}()
return remoteService.getMetadataInfo(context.Background(), revision)
metadataInfo, err := remoteService.getMetadataInfo(context.Background(), revision)
if err != nil {
return nil, withMetadataErrorContext(err, MetadataErrorKindRPCLoad, "rpc_metadata", instance.GetServiceName(), revision, metadataStorageType(instance))
}
return metadataInfo, nil
}

func metadataStorageType(instance registry.ServiceInstance) string {
if instance.GetMetadata() == nil {
return ""
}
return instance.GetMetadata()[constant.MetadataStorageTypePropertyName]
}

// remoteMetadataService is the internal interface for fetching MetadataInfo via RPC.
Expand Down Expand Up @@ -180,7 +211,12 @@ func (m *remoteMetadataServiceV1) getMetadataInfo(_ context.Context, revision st
if rawResult == nil {
logger.Warnf("[Metadata][RPC] Provider %s returned nil metadata (service may not be ready), revision=%s",
m.invoker.GetURL().Location, revision)
return nil, perrors.Errorf("metadata is nil from %s, revision: %s", m.invoker.GetURL().Location, revision)
return nil, &MetadataError{
Kind: MetadataErrorKindNil,
Source: "rpc_metadata",
Revision: revision,
Err: perrors.Errorf("metadata is nil from %s, revision: %s", m.invoker.GetURL().Location, revision),
}
}

var metadataInfo *info.MetadataInfo
Expand Down Expand Up @@ -222,12 +258,25 @@ func truncateString(s string, maxLen int) string {
// buildStandardMetadataServiceURL will use standard format to build the metadata service url.
// Returns an error if required params (protocol or port) are missing.
func buildStandardMetadataServiceURL(ins registry.ServiceInstance) (*common.URL, error) {
ps := getMetadataServiceUrlParams(ins)
ps, err := getMetadataServiceUrlParams(ins)
if err != nil {
return nil, err
}
if ps[constant.ProtocolKey] == "" {
return nil, perrors.New("metadata service URL params missing: protocol is empty")
return nil, &MetadataError{
Kind: MetadataErrorKindURLBuild,
Source: "metadata_url",
App: ins.GetServiceName(),
Err: perrors.New("metadata service URL params missing: protocol is empty"),
}
}
if ps[constant.PortKey] == "" {
return nil, perrors.New("metadata service URL params missing: port is empty")
return nil, &MetadataError{
Kind: MetadataErrorKindURLBuild,
Source: "metadata_url",
App: ins.GetServiceName(),
Err: perrors.New("metadata service URL params missing: port is empty"),
}
}

sn := ins.GetServiceName()
Expand Down Expand Up @@ -266,15 +315,21 @@ func buildStandardMetadataServiceURL(ins registry.ServiceInstance) (*common.URL,
// getMetadataServiceUrlParams this will convertV2 the metadata service url parameters to map structure
// it looks like:
// {"dubbo":{"timeout":"10000","version":"1.0.0","dubbo":"2.0.2","release":"2.7.6","port":"20880"}}
func getMetadataServiceUrlParams(ins registry.ServiceInstance) map[string]string {
func getMetadataServiceUrlParams(ins registry.ServiceInstance) (map[string]string, error) {
ps := ins.GetMetadata()
res := make(map[string]string, 2)
if str, ok := ps[constant.MetadataServiceURLParamsPropertyName]; ok && len(str) > 0 {
err := json.Unmarshal([]byte(str), &res)
if err != nil {
logger.Errorf("[Metadata][URL] could not parse the metadata service url parameters to map, err=%v", err)
return nil, &MetadataError{
Kind: MetadataErrorKindURLBuild,
Source: "metadata_url",
App: ins.GetServiceName(),
Err: perrors.Wrap(err, "could not parse metadata service URL params"),
}
}
}

return res
return res, nil
}
139 changes: 125 additions & 14 deletions metadata/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@

import (
"context"
stderrors "errors"
"testing"
)

import (
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -88,8 +89,16 @@

t.Run("no report instance", func(t *testing.T) {
instances = make(map[string]report.MetadataReport)
_, err := GetMetadataFromMetadataReport("1", ins, "default")
_, err := GetMetadataFromMetadataReport("rev-missing-report", ins, "default")
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))

Check failure on line 96 in metadata/client_test.go

View workflow job for this annotation

GitHub Actions / CI (ubuntu-latest - Go 1.23)

error-is-as: use require.ErrorAs (testifylint)
assert.Equal(t, MetadataErrorKindReportLoad, metadataErr.Kind)
assert.Equal(t, "metadata_report", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-missing-report", metadataErr.Revision)
assert.Equal(t, "default", metadataErr.RegistryID)
})

t.Run("default registry routes to default report", func(t *testing.T) {
Expand Down Expand Up @@ -140,9 +149,17 @@
defer mockReport.AssertExpectations(t)
instances["default"] = mockReport

mockReport.On("GetAppMetadata").Return(metadataInfo, errors.New("mock error")).Once()
_, err := GetMetadataFromMetadataReport("1", ins, "default")
mockReport.On("GetAppMetadata").Return(metadataInfo, pkgerrors.New("mock error")).Once()
_, err := GetMetadataFromMetadataReport("rev-report-error", ins, "default")
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))

Check failure on line 157 in metadata/client_test.go

View workflow job for this annotation

GitHub Actions / CI (ubuntu-latest - Go 1.23)

error-is-as: use require.ErrorAs (testifylint)
assert.Equal(t, MetadataErrorKindReportLoad, metadataErr.Kind)
assert.Equal(t, "metadata_report", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-report-error", metadataErr.Revision)
assert.Equal(t, "default", metadataErr.RegistryID)
})
}

Expand Down Expand Up @@ -170,23 +187,94 @@
})
t.Run("refer error", func(t *testing.T) {
mockProtocol.On("Refer").Return(nil).Once()
_, err := GetMetadataFromRpc("111", ins)
_, err := GetMetadataFromRpc("rev-refer-error", ins)
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))

Check failure on line 194 in metadata/client_test.go

View workflow job for this annotation

GitHub Actions / CI (ubuntu-latest - Go 1.23)

error-is-as: use require.ErrorAs (testifylint)
assert.Equal(t, MetadataErrorKindRPCLoad, metadataErr.Kind)
assert.Equal(t, "rpc_metadata", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-refer-error", metadataErr.Revision)
})
t.Run("invoke timeout", func(t *testing.T) {
mockProtocol.On("Refer").Return(mockInvoker).Once()
mockInvoker.On("Invoke").Return(&result.RPCResult{
Attrs: map[string]any{},
Err: errors.New("timeout error"),
Err: pkgerrors.New("timeout error"),
Rest: metadataInfo,
}).Once()
mockInvoker.On("Destroy").Once()
_, err := GetMetadataFromRpc("111", ins)
_, err := GetMetadataFromRpc("rev-timeout", ins)
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindRPCLoad, metadataErr.Kind)
assert.Equal(t, "rpc_metadata", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-timeout", metadataErr.Revision)
})
t.Run("nil response", func(t *testing.T) {
mockProtocol.On("Refer").Return(mockInvoker).Once()
mockInvoker.On("Invoke").Return(&result.RPCResult{
Attrs: map[string]any{},
Err: nil,
Rest: nil,
}).Once()
mockInvoker.On("Destroy").Once()
_, err := GetMetadataFromRpc("rev-nil", ins)
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindNil, metadataErr.Kind)
assert.Equal(t, "rpc_metadata", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-nil", metadataErr.Revision)
})
t.Run("unexpected response type", func(t *testing.T) {
mockProtocol.On("Refer").Return(mockInvoker).Once()
mockInvoker.On("Invoke").Return(&result.RPCResult{
Attrs: map[string]any{},
Err: nil,
Rest: 123,
}).Once()
mockInvoker.On("Destroy").Once()
_, err := GetMetadataFromRpc("rev-unexpected", ins)
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindRPCLoad, metadataErr.Kind)
assert.Equal(t, "rpc_metadata", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-unexpected", metadataErr.Revision)
})
}

func TestGetMetadataFromRpc_MissingURLParams(t *testing.T) {
t.Run("malformed params", func(t *testing.T) {
insMalformedParams := &registry.DefaultServiceInstance{
ID: "4",
ServiceName: "dubbo-app",
Host: "dubbo.io",
Metadata: map[string]string{
constant.MetadataServiceURLParamsPropertyName: `xxx`,
},
}

_, err := GetMetadataFromRpc("rev-malformed-params", insMalformedParams)
require.Error(t, err)

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindURLBuild, metadataErr.Kind)
assert.Equal(t, "metadata_url", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "rev-malformed-params", metadataErr.Revision)
})

t.Run("missing protocol", func(t *testing.T) {
insNoProto := &registry.DefaultServiceInstance{
ID: "2",
Expand All @@ -197,6 +285,13 @@
_, err := GetMetadataFromRpc("1", insNoProto)
require.Error(t, err)
assert.Contains(t, err.Error(), "protocol is empty")

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindURLBuild, metadataErr.Kind)
assert.Equal(t, "metadata_url", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "1", metadataErr.Revision)
})

t.Run("missing port", func(t *testing.T) {
Expand All @@ -211,6 +306,13 @@
_, err := GetMetadataFromRpc("1", insNoPort)
require.Error(t, err)
assert.Contains(t, err.Error(), "port is empty")

var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindURLBuild, metadataErr.Kind)
assert.Equal(t, "metadata_url", metadataErr.Source)
assert.Equal(t, "dubbo-app", metadataErr.App)
assert.Equal(t, "1", metadataErr.Revision)
})
}

Expand Down Expand Up @@ -305,9 +407,10 @@
ins registry.ServiceInstance
}
tests := []struct {
name string
args args
want map[string]string
name string
args args
want map[string]string
wantErr bool
}{
{
name: "normal",
Expand Down Expand Up @@ -345,12 +448,21 @@
},
},
},
want: map[string]string{},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, getMetadataServiceUrlParams(tt.args.ins), "getMetadataServiceUrlParams(%v)", tt.args.ins)
got, err := getMetadataServiceUrlParams(tt.args.ins)
if tt.wantErr {
require.Error(t, err)
var metadataErr *MetadataError
require.True(t, stderrors.As(err, &metadataErr))
assert.Equal(t, MetadataErrorKindURLBuild, metadataErr.Kind)
return
}
require.NoError(t, err)
assert.Equalf(t, tt.want, got, "getMetadataServiceUrlParams(%v)", tt.args.ins)
})
}
}
Expand Down Expand Up @@ -398,8 +510,7 @@
// Handle both *info.MetadataInfo and *interface{} reply types
// This supports the new implementation that uses interface{} to handle different return types
if replyPtr, ok := inv.Reply().(*any); ok {
// New code path: reply is *interface{}, set it to point to the metadata
*replyPtr = res.Result().(*info.MetadataInfo)
*replyPtr = res.Result()
} else if reply, ok := inv.Reply().(*info.MetadataInfo); ok {
// Old code path: reply is *info.MetadataInfo, copy fields
meta := res.Result().(*info.MetadataInfo)
Expand Down
Loading
Loading