aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/internal/binarylog/sink.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/internal/binarylog/sink.go')
-rw-r--r--vendor/google.golang.org/grpc/internal/binarylog/sink.go41
1 files changed, 26 insertions, 15 deletions
diff --git a/vendor/google.golang.org/grpc/internal/binarylog/sink.go b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
index 7d7a3056b..c2fdd58b3 100644
--- a/vendor/google.golang.org/grpc/internal/binarylog/sink.go
+++ b/vendor/google.golang.org/grpc/internal/binarylog/sink.go
@@ -69,7 +69,8 @@ type writerSink struct {
func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
b, err := proto.Marshal(e)
if err != nil {
- grpclogLogger.Infof("binary logging: failed to marshal proto message: %v", err)
+ grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
+ return err
}
hdr := make([]byte, 4)
binary.BigEndian.PutUint32(hdr, uint32(len(b)))
@@ -85,24 +86,27 @@ func (ws *writerSink) Write(e *pb.GrpcLogEntry) error {
func (ws *writerSink) Close() error { return nil }
type bufferedSink struct {
- mu sync.Mutex
- closer io.Closer
- out Sink // out is built on buf.
- buf *bufio.Writer // buf is kept for flush.
-
- writeStartOnce sync.Once
- writeTicker *time.Ticker
+ mu sync.Mutex
+ closer io.Closer
+ out Sink // out is built on buf.
+ buf *bufio.Writer // buf is kept for flush.
+ flusherStarted bool
+
+ writeTicker *time.Ticker
+ done chan struct{}
}
func (fs *bufferedSink) Write(e *pb.GrpcLogEntry) error {
- // Start the write loop when Write is called.
- fs.writeStartOnce.Do(fs.startFlushGoroutine)
fs.mu.Lock()
+ defer fs.mu.Unlock()
+ if !fs.flusherStarted {
+ // Start the write loop when Write is called.
+ fs.startFlushGoroutine()
+ fs.flusherStarted = true
+ }
if err := fs.out.Write(e); err != nil {
- fs.mu.Unlock()
return err
}
- fs.mu.Unlock()
return nil
}
@@ -113,7 +117,12 @@ const (
func (fs *bufferedSink) startFlushGoroutine() {
fs.writeTicker = time.NewTicker(bufFlushDuration)
go func() {
- for range fs.writeTicker.C {
+ for {
+ select {
+ case <-fs.done:
+ return
+ case <-fs.writeTicker.C:
+ }
fs.mu.Lock()
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
@@ -124,10 +133,12 @@ func (fs *bufferedSink) startFlushGoroutine() {
}
func (fs *bufferedSink) Close() error {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
if fs.writeTicker != nil {
fs.writeTicker.Stop()
}
- fs.mu.Lock()
+ close(fs.done)
if err := fs.buf.Flush(); err != nil {
grpclogLogger.Warningf("failed to flush to Sink: %v", err)
}
@@ -137,7 +148,6 @@ func (fs *bufferedSink) Close() error {
if err := fs.out.Close(); err != nil {
grpclogLogger.Warningf("failed to close the Sink: %v", err)
}
- fs.mu.Unlock()
return nil
}
@@ -155,5 +165,6 @@ func NewBufferedSink(o io.WriteCloser) Sink {
closer: o,
out: newWriterSink(bufW),
buf: bufW,
+ done: make(chan struct{}),
}
}