Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions storage/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ roles which must be enabled in order to do the export successfully. To
disable this export, you can use the [WithDisabledClientMetrics] client
option.

The gRPC client's default auto checksumming can introduce increased CPU overhead during writes
because of checksum computation. Users can disable automatic checksumming for gRPC writer using [Writer.DisableAutoChecksum].

# Storage Control API

Certain control plane and long-running operations for Cloud Storage (including Folder
Expand Down
112 changes: 68 additions & 44 deletions storage/grpc_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,12 +818,19 @@ func bidiWriteObjectRequest(r gRPCBidiWriteRequest, bufChecksum *uint32, objectC
return req
}

type getObjectChecksumsParams struct {
fullObjectChecksum func() uint32
finishWrite bool
// checksumAttrs contains all parameters required from writer in senders,
// helps avoid piping in writer to the individual senders.
type checksumAttrs struct {
sendCRC32C bool
disableAutoChecksum bool
attrs *ObjectAttrs
objectAttrs *ObjectAttrs
fullObjectChecksum func() uint32
}

type getObjectChecksumsParams struct {
checksumAttrs
finishWrite bool
takeoverWriter bool
}

// getObjectChecksums determines what checksum information to include in the final
Expand All @@ -840,9 +847,10 @@ func getObjectChecksums(params *getObjectChecksumsParams) *storagepb.ObjectCheck

// send user's checksum on last write op if available
if params.sendCRC32C {
return toProtoChecksums(params.sendCRC32C, params.attrs)
return toProtoChecksums(params.sendCRC32C, params.objectAttrs)
}
if params.disableAutoChecksum {
// TODO(b/461982277): Enable checksum validation for appendable takeover writer gRPC
if params.disableAutoChecksum || params.takeoverWriter {
return nil
}
return &storagepb.ObjectChecksums{
Expand Down Expand Up @@ -879,8 +887,7 @@ type gRPCOneshotBidiWriteBufferSender struct {
firstMessage *storagepb.BidiWriteObjectRequest
streamErr error

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
checksumAttrs checksumAttrs
}

func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWriteBufferSender {
Expand All @@ -894,11 +901,13 @@ func (w *gRPCWriter) newGRPCOneshotBidiWriteBufferSender() *gRPCOneshotBidiWrite
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}
Expand Down Expand Up @@ -939,17 +948,13 @@ func (s *gRPCOneshotBidiWriteBufferSender) connect(ctx context.Context, cs gRPCB
continue
}

sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
checksumAttrs: s.checksumAttrs,
finishWrite: r.finishWrite,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

Expand Down Expand Up @@ -996,8 +1001,7 @@ type gRPCResumableBidiWriteBufferSender struct {
startWriteRequest *storagepb.StartResumableWriteRequest
upid string

checksumSettings func() (bool, bool, *ObjectAttrs)
fullObjectChecksum func() uint32
checksumAttrs checksumAttrs

streamErr error
}
Expand All @@ -1011,11 +1015,13 @@ func (w *gRPCWriter) newGRPCResumableBidiWriteBufferSender() *gRPCResumableBidiW
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
ObjectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
},
checksumSettings: func() (bool, bool, *ObjectAttrs) {
return w.sendCRC32C, w.disableAutoChecksum, w.attrs
},
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}
Expand Down Expand Up @@ -1076,17 +1082,13 @@ func (s *gRPCResumableBidiWriteBufferSender) connect(ctx context.Context, cs gRP
continue
}

sendCrc32C, disableAutoChecksum, attrs := s.checksumSettings()
var bufChecksum *uint32
if !disableAutoChecksum {
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: s.fullObjectChecksum,
finishWrite: r.finishWrite,
sendCRC32C: sendCrc32C,
disableAutoChecksum: disableAutoChecksum,
attrs: attrs,
checksumAttrs: s.checksumAttrs,
finishWrite: r.finishWrite,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)

Expand Down Expand Up @@ -1142,12 +1144,14 @@ type gRPCAppendBidiWriteBufferSender struct {
bucket string
routingToken *string

firstMessage *storagepb.BidiWriteObjectRequest

objectChecksums *storagepb.ObjectChecksums
firstMessage *storagepb.BidiWriteObjectRequest
finalizeOnClose bool
objResource *storagepb.Object

checksumAttrs checksumAttrs

takeoverWriter bool

streamErr error
}

Expand All @@ -1164,8 +1168,15 @@ func (w *gRPCWriter) newGRPCAppendableObjectBufferSender() *gRPCAppendBidiWriteB
},
CommonObjectRequestParams: toProtoCommonObjectRequestParams(w.encryptionKey),
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
}
}

Expand Down Expand Up @@ -1278,8 +1289,16 @@ func (w *gRPCWriter) newGRPCAppendTakeoverWriteBufferSender() *gRPCAppendTakeove
AppendObjectSpec: writeObjectSpecAsAppendObjectSpec(w.spec, w.appendGen),
},
},
objectChecksums: toProtoChecksums(w.sendCRC32C, w.attrs),
finalizeOnClose: w.finalizeOnClose,
takeoverWriter: true,
checksumAttrs: checksumAttrs{
sendCRC32C: w.sendCRC32C,
disableAutoChecksum: w.disableAutoChecksum,
objectAttrs: w.attrs,
fullObjectChecksum: func() uint32 {
return w.fullObjectChecksum
},
},
},
takeoverReported: false,
handleTakeoverCompletion: func(c gRPCBidiWriteCompletion) {
Expand Down Expand Up @@ -1409,12 +1428,17 @@ func (s *gRPCAppendBidiWriteBufferSender) send(stream storagepb.Storage_BidiWrit
flush: flush,
finishWrite: finalizeObject,
}
// TODO(b/453869602): implement default checksumming for appendable writes
req := bidiWriteObjectRequest(r, nil, nil)
if finalizeObject {
// appendable objects pass checksums on the finalize message only
req.ObjectChecksums = s.objectChecksums

var bufChecksum *uint32
if !s.checksumAttrs.disableAutoChecksum {
bufChecksum = proto.Uint32(crc32.Checksum(r.buf, crc32cTable))
}
objectChecksums := getObjectChecksums(&getObjectChecksumsParams{
checksumAttrs: s.checksumAttrs,
finishWrite: finalizeObject,
takeoverWriter: s.takeoverWriter,
})
req := bidiWriteObjectRequest(r, bufChecksum, objectChecksums)
if sendFirstMessage {
proto.Merge(req, s.firstMessage)
}
Expand Down
21 changes: 16 additions & 5 deletions storage/grpc_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func TestGetObjectChecksums(t *testing.T) {
fullObjectChecksum func() uint32
finishWrite bool
sendCRC32C bool
takeoverWriter bool
disableAutoChecksum bool
attrs *ObjectAttrs
want *storagepb.ObjectChecksums
Expand Down Expand Up @@ -73,16 +74,26 @@ func TestGetObjectChecksums(t *testing.T) {
Crc32C: proto.Uint32(456),
},
},
// TODO(b/461982277): remove this testcase once checksums for takeover writer is implemented
{
name: "takeover writer should return nil",
finishWrite: true,
takeoverWriter: true,
want: nil,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got := getObjectChecksums(&getObjectChecksumsParams{
fullObjectChecksum: tt.fullObjectChecksum,
finishWrite: tt.finishWrite,
sendCRC32C: tt.sendCRC32C,
disableAutoChecksum: tt.disableAutoChecksum,
attrs: tt.attrs,
checksumAttrs: checksumAttrs{
disableAutoChecksum: tt.disableAutoChecksum,
sendCRC32C: tt.sendCRC32C,
objectAttrs: tt.attrs,
fullObjectChecksum: tt.fullObjectChecksum,
},
finishWrite: tt.finishWrite,
takeoverWriter: tt.takeoverWriter,
})
if !proto.Equal(got, tt.want) {
t.Errorf("getObjectChecksums() = %v, want %v", got, tt.want)
Expand Down
Loading
Loading