Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions storage/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ roles which must be enabled in order to do the export successfully. To
disable this export, you can use the [WithDisabledClientMetrics] client
option.

The gRPC client's default auto checksumming can introduce increased CPU overhead during writes
because of checksum computation. Users can disable automatic checksumming for gRPC writer using [Writer.DisableAutoChecksum].

# Storage Control API

Certain control plane and long-running operations for Cloud Storage (including Folder
Expand Down
47 changes: 35 additions & 12 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -824,6 +824,7 @@ type getObjectChecksumsParams struct {
sendCRC32C bool
disableAutoChecksum bool
attrs *ObjectAttrs
takeoverWriter bool
}

// getObjectChecksums determines what checksum information to include in the final
Expand All @@ -842,7 +843,8 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck
if params.sendCRC32C {
return toProtoChecksums(params.sendCRC32C, params.attrs)
}
if params.disableAutoChecksum {
// TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC
if params.disableAutoChecksum || params.takeoverWriter {
return nil
}
return &storagepb.ObjectChecksums{
Expand Down Expand Up @@ -1142,12 +1144,14 @@ type gRPCAppendBidiWriteBufferSender struct {
bucket string
routingToken *string

firstMessage *storagepb.BidiWriteObjectRequest

objectChecksums *storagepb.ObjectChecksums
firstMessage *storagepb.BidiWriteObjectRequest
finalizeOnClose bool
objResource *storagepb.Object

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
takeoverWriter bool

streamErr error
}

Expand All @@ -1164,8 +1168,13 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB
},
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
}
}

Expand Down Expand Up @@ -1278,8 +1287,14 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen),
},
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
takeoverWriter: true,
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
takeoverReported: false,
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
Expand Down Expand Up @@ -1409,12 +1424,20 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit
flush: flush,
finishWrite: finalizeObject,
}
// TODO(b/453869602): implement default checksumming for appendable writes
req := bidiWriteObjectRequest(r, nil, nil)
if finalizeObject {
// appendable objects pass checksums on the finalize message only
req.ObjectChecksums = s.objectChecksums
}
sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: finalizeObject,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
takeoverWriter: s.takeoverWriter,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
if sendFirstMessage {
proto.Merge(req, s.firstMessage)
}
Expand Down
9 changes: 9 additions & 0 deletions storage/grpc_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestGetObjectChecksums(t *testing.T) {
fullObjectChecksum func() uint32
finishWrite bool
sendCRC32C bool
takeoverWriter bool
disableAutoChecksum bool
attrs *ObjectAttrs
want *storagepb.ObjectChecksums
Expand Down Expand Up @@ -73,6 +74,13 @@ func TestGetObjectChecksums(t *testing.T) {
Crc32C: proto.Uint32(456),
},
},
// TODO(b/461982277): remove this testcase once checksums for takeover writer is implemented
{
name: "takeover writer should return nil",
finishWrite: true,
takeoverWriter: true,
want: nil,
},
}

for _, tt := range tests {
Expand All @@ -83,6 +91,7 @@ func TestGetObjectChecksums(t *testing.T) {
sendCRC32C: tt.sendCRC32C,
disableAutoChecksum: tt.disableAutoChecksum,
attrs: tt.attrs,
takeoverWriter: tt.takeoverWriter,
})
if !proto.Equal(got, tt.want) {
t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want)
Expand Down
Loading