Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions storage/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ roles which must be enabled in order to do the export successfully. To
disable this export, you can use the [WithDisabledClientMetrics] client
option.

The gRPC client's default auto checksumming can introduce increased CPU overhead during writes
because of checksum computation. Users can disable automatic checksumming for gRPC writer using [Writer.DisableAutoChecksum].

# Storage Control API

Certain control plane and long-running operations for Cloud Storage (including Folder
Expand Down
112 changes: 68 additions & 44 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,19 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC
return req
}

type getObjectChecksumsParams struct {
fullObjectChecksum func() uint32
finishWrite bool
// checksumAttrs contains all parameters required from writer in senders.
// helps avoid piping in writer to the individual senders.
type checksumAttrs struct {
sendCRC32C bool
disableAutoChecksum bool
attrs *ObjectAttrs
objectAttrs *ObjectAttrs
fullObjectChecksum func() uint32
}

type getObjectChecksumsParams struct {
checksumAttrs
finishWrite bool
takeoverWriter bool
}

// getObjectChecksums determines what checksum information to include in the final
Expand All @@ -840,9 +847,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck

// send user's checksum on last write op if available
if params.sendCRC32C {
return toProtoChecksums(params.sendCRC32C, params.attrs)
return toProtoChecksums(params.sendCRC32C, params.objectAttrs)
}
if params.disableAutoChecksum {
// TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC
if params.disableAutoChecksum || params.takeoverWriter {
return nil
}
return &storagepb.ObjectChecksums{
Expand Down Expand Up @@ -879,8 +887,7 @@ type gRPCOneshotBidiWriteBufferSender struct {
firstMessage *storagepb.BidiWriteObjectRequest
streamErr error

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
checksumAttrs checksumAttrs
}

func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender {
Expand All @@ -894,11 +901,13 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}
Expand Down Expand Up @@ -939,17 +948,13 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
continue
}

sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
checksumAttrs: s.checksumAttrs,
finishWrite: r.finishWrite,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

Expand Down Expand Up @@ -996,8 +1001,7 @@ type gRPCResumableBidiWriteBufferSender struct {
startWriteRequest *storagepb.StartResumableWriteRequest
upid string

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
checksumAttrs checksumAttrs

streamErr error
}
Expand All @@ -1011,11 +1015,13 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}
Expand Down Expand Up @@ -1076,17 +1082,13 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
continue
}

sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
checksumAttrs: s.checksumAttrs,
finishWrite: r.finishWrite,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

Expand Down Expand Up @@ -1142,12 +1144,14 @@ type gRPCAppendBidiWriteBufferSender struct {
bucket string
routingToken *string

firstMessage *storagepb.BidiWriteObjectRequest

objectChecksums *storagepb.ObjectChecksums
firstMessage *storagepb.BidiWriteObjectRequest
finalizeOnClose bool
objResource *storagepb.Object

checksumAttrs checksumAttrs

takeoverWriter bool

streamErr error
}

Expand All @@ -1164,8 +1168,15 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB
},
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}

Expand Down Expand Up @@ -1278,8 +1289,16 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen),
},
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
takeoverWriter: true,
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
},
takeoverReported: false,
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
Expand Down Expand Up @@ -1409,12 +1428,17 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit
flush: flush,
finishWrite: finalizeObject,
}
// TODO(b/453869602): implement default checksumming for appendable writes
req := bidiWriteObjectRequest(r, nil, nil)
if finalizeObject {
// appendable objects pass checksums on the finalize message only
req.ObjectChecksums = s.objectChecksums

var bufChecksum *uint32
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
checksumAttrs: s.checksumAttrs,
finishWrite: finalizeObject,
takeoverWriter: s.takeoverWriter,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
if sendFirstMessage {
proto.Merge(req, s.firstMessage)
}
Expand Down
21 changes: 16 additions & 5 deletions storage/grpc_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestGetObjectChecksums(t *testing.T) {
fullObjectChecksum func() uint32
finishWrite bool
sendCRC32C bool
takeoverWriter bool
disableAutoChecksum bool
attrs *ObjectAttrs
want *storagepb.ObjectChecksums
Expand Down Expand Up @@ -73,16 +74,26 @@ func TestGetObjectChecksums(t *testing.T) {
Crc32C: proto.Uint32(456),
},
},
// TODO(b/461982277): remove this testcase once checksums for takeover writer is implemented
{
name: "takeover writer should return nil",
finishWrite: true,
takeoverWriter: true,
want: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: tt.fullObjectChecksum,
finishWrite: tt.finishWrite,
sendCRC32C: tt.sendCRC32C,
disableAutoChecksum: tt.disableAutoChecksum,
attrs: tt.attrs,
checksumAttrs: checksumAttrs{
disableAutoChecksum: tt.disableAutoChecksum,
sendCRC32C: tt.sendCRC32C,
objectAttrs: tt.attrs,
fullObjectChecksum: tt.fullObjectChecksum,
},
finishWrite: tt.finishWrite,
takeoverWriter: tt.takeoverWriter,
})
if !proto.Equal(got, tt.want) {
t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want)
Expand Down
Loading
Loading