diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/binarylog/sink.go')
-rw-r--r-- | vendor/google.golang.org/grpc/internal/binarylog/sink.go | 41 |
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go index 7d7a3056b..c2fdd58b3 100644 --- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go +++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go @@ -69,7 +69,8 @@ type writerSink struct { func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { b, err := proto.Marshal(e) if err != nil { - grpclogLogger.Infof("binary logging: failed to marshal proto message: %v", err) + grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err) + return err } hdr := make([]byte, 4) binary.BigEndian.PutUint32(hdr, uint32(len(b))) @@ -85,24 +86,27 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error { func (ws *writerSink) Close() error { return nil } type bufferedSink struct { - mu sync.Mutex - closer io.Closer - out Sink // out is built on buf. - buf *bufio.Writer // buf is kept for flush. - - writeStartOnce sync.Once - writeTicker *time.Ticker + mu sync.Mutex + closer io.Closer + out Sink // out is built on buf. + buf *bufio.Writer // buf is kept for flush. + flusherStarted bool + + writeTicker *time.Ticker + done chan struct{} } func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error { - // Start the write loop when Write is called. - fs.writeStartOnce.Do(fs.startFlushGoroutine) fs.mu.Lock() + defer fs.mu.Unlock() + if !fs.flusherStarted { + // Start the write loop when Write is called. + fs.startFlushGoroutine() + fs.flusherStarted = true + } if err := fs.out.Write(e); err != nil { - fs.mu.Unlock() return err } - fs.mu.Unlock() return nil } @@ -113,7 +117,12 @@ const ( func (fs *bufferedSink) startFlushGoroutine() { fs.writeTicker = time.NewTicker(bufFlushDuration) go func() { - for range fs.writeTicker.C { + for { + select { + case <-fs.done: + return + case <-fs.writeTicker.C: + } fs.mu.Lock() if err := fs.buf.Flush(); err != nil { grpclogLogger.Warningf("failed to flush to Sink: %v", err) @@ -124,10 +133,12 @@ func (fs *bufferedSink) startFlushGoroutine() { } func (fs *bufferedSink) Close() error { + fs.mu.Lock() + defer fs.mu.Unlock() if fs.writeTicker != nil { fs.writeTicker.Stop() } - fs.mu.Lock() + close(fs.done) if err := fs.buf.Flush(); err != nil { grpclogLogger.Warningf("failed to flush to Sink: %v", err) } @@ -137,7 +148,6 @@ func (fs *bufferedSink) Close() error { if err := fs.out.Close(); err != nil { grpclogLogger.Warningf("failed to close the Sink: %v", err) } - fs.mu.Unlock() return nil } @@ -155,5 +165,6 @@ func NewBufferedSink(o io.WriteCloser) Sink { closer: o, out: newWriterSink(bufW), buf: bufW, + done: make(chan struct{}), } } |