@@ -12,6 +12,7 @@ import (
12
12
"github.com/buildbuddy-io/buildbuddy/server/real_environment"
13
13
"github.com/buildbuddy-io/buildbuddy/server/remote_cache/digest"
14
14
"github.com/buildbuddy-io/buildbuddy/server/util/authutil"
15
+ "github.com/buildbuddy-io/buildbuddy/server/util/grpc_stream"
15
16
"github.com/buildbuddy-io/buildbuddy/server/util/log"
16
17
"github.com/buildbuddy-io/buildbuddy/server/util/status"
17
18
"github.com/buildbuddy-io/buildbuddy/server/util/tracing"
@@ -75,7 +76,7 @@ func (s *ByteStreamServerProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStre
75
76
return err
76
77
}
77
78
78
- localReadStream , err := s .local .Read (ctx , req )
79
+ localGRPCReadStream , err := s .local .Read (ctx , req )
79
80
if err != nil {
80
81
if skipRemote {
81
82
recordReadMetrics (metrics .MissStatusLabel , requestTypeLabel , err , 0 )
@@ -96,12 +97,9 @@ func (s *ByteStreamServerProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStre
96
97
}
97
98
98
99
responseSent := false
99
- bytesRead := 0
100
+ localReadStream := grpc_stream . NewByteCountingServerStream ( localGRPCReadStream )
100
101
for {
101
102
rsp , err := localReadStream .Recv ()
102
- if rsp != nil {
103
- bytesRead += len (rsp .Data )
104
- }
105
103
if err == io .EOF {
106
104
break
107
105
}
@@ -112,10 +110,10 @@ func (s *ByteStreamServerProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStre
112
110
// the remote cache, but keep it simple for now.
113
111
if responseSent {
114
112
log .CtxInfof (ctx , "error midstream of local read: %s" , err )
115
- recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , err , bytesRead )
113
+ recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , err , int ( localReadStream . GetByteCount ()) )
116
114
return err
117
115
} else if skipRemote {
118
- recordReadMetrics (metrics .MissStatusLabel , requestTypeLabel , err , bytesRead )
116
+ recordReadMetrics (metrics .MissStatusLabel , requestTypeLabel , err , int ( localReadStream . GetByteCount ()) )
119
117
return err
120
118
} else {
121
119
// Fall back to reading remotely if the local read fails.
@@ -126,12 +124,12 @@ func (s *ByteStreamServerProxy) Read(req *bspb.ReadRequest, stream bspb.ByteStre
126
124
}
127
125
128
126
if err := stream .Send (rsp ); err != nil {
129
- recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , err , bytesRead )
127
+ recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , err , int ( localReadStream . GetByteCount ()) )
130
128
return err
131
129
}
132
130
responseSent = true
133
131
}
134
- recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , nil , bytesRead )
132
+ recordReadMetrics (metrics .HitStatusLabel , requestTypeLabel , nil , int ( localReadStream . GetByteCount ()) )
135
133
return nil
136
134
}
137
135
@@ -213,11 +211,12 @@ func (s *ByteStreamServerProxy) readRemote(req *bspb.ReadRequest, stream bspb.By
213
211
ctx , spn := tracing .StartSpan (stream .Context ())
214
212
defer spn .End ()
215
213
216
- remoteReadStream , err := s .remote .Read (ctx , req )
214
+ remoteGRPCReadStream , err := s .remote .Read (ctx , req )
217
215
if err != nil {
218
216
log .CtxInfof (ctx , "error reading from remote: %s" , err )
219
217
return 0 , err
220
218
}
219
+ remoteReadStream := grpc_stream .NewByteCountingServerStream (remoteGRPCReadStream )
221
220
222
221
var localWriteStream localWriter = & discardingLocalWriter {}
223
222
if req .ReadOffset == 0 && ! authutil .EncryptionEnabled (ctx , s .authenticator ) {
@@ -235,12 +234,8 @@ func (s *ByteStreamServerProxy) readRemote(req *bspb.ReadRequest, stream bspb.By
235
234
}
236
235
}
237
236
238
- bytesRead := 0
239
237
for {
240
238
rsp , err := remoteReadStream .Recv ()
241
- if rsp != nil {
242
- bytesRead += len (rsp .Data )
243
- }
244
239
if err != nil {
245
240
if err == io .EOF {
246
241
if err := localWriteStream .commit (); err != nil {
@@ -249,18 +244,18 @@ func (s *ByteStreamServerProxy) readRemote(req *bspb.ReadRequest, stream bspb.By
249
244
break
250
245
}
251
246
log .CtxInfof (ctx , "error streaming from remote for read through: %s" , err )
252
- return bytesRead , err
247
+ return int ( remoteReadStream . GetByteCount ()) , err
253
248
}
254
249
255
250
if err := localWriteStream .send (rsp .Data ); err != nil {
256
251
log .CtxInfof (ctx , "Error writing locally for read through: %s" , err )
257
252
localWriteStream = & discardingLocalWriter {}
258
253
}
259
254
if err = stream .Send (rsp ); err != nil {
260
- return bytesRead , err
255
+ return int ( remoteReadStream . GetByteCount ()) , err
261
256
}
262
257
}
263
- return bytesRead , nil
258
+ return int ( remoteReadStream . GetByteCount ()) , nil
264
259
}
265
260
266
261
func (s * ByteStreamServerProxy ) Write (stream bspb.ByteStream_WriteServer ) error {
0 commit comments