Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions packages/orchestrator/chunks.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ message PeerAvailability {

message GetBuildFileSizeRequest {
string build_id = 1;
// file_name is one of the seekable diff files: "memfile", "rootfs.ext4"
string file_name = 2;
// name is one of the seekable diff files: "memfile", "rootfs.ext4"
string name = 2;
}

message GetBuildFileSizeResponse {
Expand All @@ -31,8 +31,8 @@ message GetBuildFileSizeResponse {

message GetBuildFileExistsRequest {
string build_id = 1;
// file_name is one of: "snapfile", "metadata.json"
string file_name = 2;
// name is one of: "snapfile", "metadata.json"
string name = 2;
}

message GetBuildFileExistsResponse {
Expand All @@ -41,7 +41,7 @@ message GetBuildFileExistsResponse {

message ReadAtBuildSeekableRequest {
string build_id = 1;
string file_name = 2;
string name = 2;
int64 offset = 3;
int64 length = 4;
}
Expand All @@ -54,8 +54,8 @@ message ReadAtBuildSeekableResponse {

message GetBuildBlobRequest {
string build_id = 1;
// file_name is one of: "snapfile", "metadata.json", "memfile.header", "rootfs.ext4.header"
string file_name = 2;
// name is one of: "snapfile", "metadata.json", "memfile.header", "rootfs.ext4.header"
string name = 2;
}

message GetBuildBlobResponse {
Expand Down
82 changes: 61 additions & 21 deletions packages/orchestrator/pkg/sandbox/template/peerclient/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"io"
"sync"
"sync/atomic"

"go.uber.org/zap"
Expand All @@ -16,22 +17,49 @@ import (

var _ storage.Blob = (*peerBlob)(nil)

// peerBlob reads from the peer first; on fallthrough, opens base lazily.
// The base path is fixed at construction (blobs are not compressed).
type peerBlob struct {
peerHandle[storage.Blob]
peerHandle

openBase func(ctx context.Context) (storage.Blob, error)

mu sync.Mutex
base storage.Blob
loaded bool
}

func (b *peerBlob) getBase(ctx context.Context) (storage.Blob, error) {
b.mu.Lock()
defer b.mu.Unlock()

if b.loaded {
return b.base, nil
}

base, err := b.openBase(ctx)
if err != nil {
return nil, err
}

b.base = base
b.loaded = true

return base, nil
}

func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
return withPeerFallback(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-write-to", attrOpWriteTo,
func(ctx context.Context) (peerAttempt[int64], error) {
streamCtx, cancel := context.WithCancel(ctx)

recv, err := openPeerBlobStream(streamCtx, b.client, &orchestrator.GetBuildBlobRequest{
BuildId: b.buildID,
FileName: b.fileName,
BuildId: b.buildID,
Name: b.name,
}, b.uploaded)
if err != nil {
cancel()
logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.fileName), zap.Error(err))
logger.L().Warn(ctx, "failed to open peer blob stream", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))

return peerAttempt[int64]{}, nil
}
Expand All @@ -42,43 +70,55 @@ func (b *peerBlob) WriteTo(ctx context.Context, dst io.Writer) (int64, error) {
n, err := io.Copy(dst, reader)
if err != nil {
return peerAttempt[int64]{value: n, bytes: n, hit: true},
fmt.Errorf("failed to stream file %q from peer: %w", b.fileName, err)
fmt.Errorf("failed to stream file %q from peer: %w", b.name, err)
}

return peerAttempt[int64]{value: n, bytes: n, hit: true}, nil
},
func(ctx context.Context, base storage.Blob) (int64, error) {
return base.WriteTo(ctx, dst)
},
)
})
if res.hit {
return res.value, err
}

base, err := b.getBase(ctx)
if err != nil {
return 0, err
}

return base.WriteTo(ctx, dst)
}

func (b *peerBlob) Exists(ctx context.Context) (bool, error) {
return withPeerFallback(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
res, err := tryPeer(ctx, &b.peerHandle, "peer-blob-exists", attrOpExists,
func(ctx context.Context) (peerAttempt[bool], error) {
resp, err := b.client.GetBuildFileExists(ctx, &orchestrator.GetBuildFileExistsRequest{
BuildId: b.buildID,
FileName: b.fileName,
BuildId: b.buildID,
Name: b.name,
})
if err == nil && checkPeerAvailability(resp.GetAvailability(), b.uploaded) {
return peerAttempt[bool]{value: true, hit: true}, nil
}

if err != nil {
logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.fileName), zap.Error(err))
logger.L().Warn(ctx, "failed to check build file exists from peer", logger.WithBuildID(b.buildID), zap.String("file_name", b.name), zap.Error(err))
}

return peerAttempt[bool]{}, nil
},
func(ctx context.Context, base storage.Blob) (bool, error) {
return base.Exists(ctx)
},
)
})
if res.hit {
return res.value, err
}

base, err := b.getBase(ctx)
if err != nil {
return false, err
}

return base.Exists(ctx)
}

func (b *peerBlob) Put(ctx context.Context, data []byte, opts ...storage.PutOption) error {
// Writes always go to the base provider (GCS/S3); the peer is read-only.
fallback, err := b.getOrOpenBase(ctx)
fallback, err := b.getBase(ctx)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,13 @@ func TestPeerBlob_WriteTo_PeerSucceeds(t *testing.T) {

client := orchestratormocks.NewMockChunkServiceClient(t)
client.EXPECT().GetBuildBlob(mock.Anything, mock.MatchedBy(func(req *orchestrator.GetBuildBlobRequest) bool {
return req.GetBuildId() == "build-1" && req.GetFileName() == "snapfile"
return req.GetBuildId() == "build-1" && req.GetName() == "snapfile"
})).Return(stream, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
blob := &peerBlob{peerHandle: peerHandle{
client: client,
buildID: "build-1",
fileName: "snapfile",
name: "snapfile",
uploaded: &atomic.Bool{},
}}

Expand Down Expand Up @@ -62,15 +62,17 @@ func TestPeerBlob_WriteTo_PeerNotAvailable_FallsBackToBase(t *testing.T) {
base := storage.NewMockStorageProvider(t)
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
client: client,
buildID: "build-1",
fileName: "snapfile",
uploaded: &atomic.Bool{},
openFn: func(ctx context.Context) (storage.Blob, error) {
blob := &peerBlob{
peerHandle: peerHandle{
client: client,
buildID: "build-1",
name: "snapfile",
uploaded: &atomic.Bool{},
},
openBase: func(ctx context.Context) (storage.Blob, error) {
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
},
}}
}

var buf bytes.Buffer
n, err := blob.WriteTo(t.Context(), &buf)
Expand All @@ -94,15 +96,17 @@ func TestPeerBlob_WriteTo_PeerError_FallsBackToBase(t *testing.T) {
base := storage.NewMockStorageProvider(t)
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
client: client,
buildID: "build-1",
fileName: "snapfile",
uploaded: &atomic.Bool{},
openFn: func(ctx context.Context) (storage.Blob, error) {
blob := &peerBlob{
peerHandle: peerHandle{
client: client,
buildID: "build-1",
name: "snapfile",
uploaded: &atomic.Bool{},
},
openBase: func(ctx context.Context) (storage.Blob, error) {
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
},
}}
}

var buf bytes.Buffer
_, err := blob.WriteTo(t.Context(), &buf)
Expand Down Expand Up @@ -139,15 +143,17 @@ func TestPeerBlob_WriteTo_UploadedSetMidStream_CompletesFromPeerThenFallsBack(t
base := storage.NewMockStorageProvider(t)
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
client: client,
buildID: "build-1",
fileName: "snapfile",
uploaded: uploaded,
openFn: func(ctx context.Context) (storage.Blob, error) {
blob := &peerBlob{
peerHandle: peerHandle{
client: client,
buildID: "build-1",
name: "snapfile",
uploaded: uploaded,
},
openBase: func(ctx context.Context) (storage.Blob, error) {
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
},
}}
}

// First download: in-flight stream completes from peer despite uploaded being set mid-stream.
var buf1 bytes.Buffer
Expand All @@ -170,10 +176,10 @@ func TestPeerBlob_Exists_PeerHasFile(t *testing.T) {

client := orchestratormocks.NewMockChunkServiceClient(t)
client.EXPECT().GetBuildFileExists(mock.Anything, mock.MatchedBy(func(req *orchestrator.GetBuildFileExistsRequest) bool {
return req.GetBuildId() == "build-1" && req.GetFileName() == "snapfile"
return req.GetBuildId() == "build-1" && req.GetName() == "snapfile"
})).Return(&orchestrator.GetBuildFileExistsResponse{}, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{client: client, buildID: "build-1", fileName: "snapfile", uploaded: &atomic.Bool{}}}
blob := &peerBlob{peerHandle: peerHandle{client: client, buildID: "build-1", name: "snapfile", uploaded: &atomic.Bool{}}}
ok, err := blob.Exists(t.Context())
require.NoError(t, err)
assert.True(t, ok)
Expand All @@ -190,15 +196,17 @@ func TestPeerBlob_Exists_PeerNotAvailable_FallsBackToBase(t *testing.T) {
base := storage.NewMockStorageProvider(t)
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)

blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
client: client,
buildID: "build-1",
fileName: "snapfile",
uploaded: &atomic.Bool{},
openFn: func(ctx context.Context) (storage.Blob, error) {
blob := &peerBlob{
peerHandle: peerHandle{
client: client,
buildID: "build-1",
name: "snapfile",
uploaded: &atomic.Bool{},
},
openBase: func(ctx context.Context) (storage.Blob, error) {
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
},
}}
}

ok, err := blob.Exists(t.Context())
require.NoError(t, err)
Expand All @@ -217,15 +225,17 @@ func TestPeerBlob_Exists_UseStorage_FallsBackToBase(t *testing.T) {
base.EXPECT().OpenBlob(mock.Anything, "build-1/snapfile", storage.SnapfileObjectType).Return(baseBlob, nil)

uploaded := &atomic.Bool{}
blob := &peerBlob{peerHandle: peerHandle[storage.Blob]{
client: client,
buildID: "build-1",
fileName: "snapfile",
uploaded: uploaded,
openFn: func(ctx context.Context) (storage.Blob, error) {
blob := &peerBlob{
peerHandle: peerHandle{
client: client,
buildID: "build-1",
name: "snapfile",
uploaded: uploaded,
},
openBase: func(ctx context.Context) (storage.Blob, error) {
return base.OpenBlob(ctx, "build-1/snapfile", storage.SnapfileObjectType)
},
}}
}

ok, err := blob.Exists(t.Context())
require.NoError(t, err)
Expand Down
Loading
Loading