zlog/diode/diode.go

92 lines
2.0 KiB
Go

// Package diode provides a thread-safe, lock-free, non-blocking io.Writer
// wrapper.
package diode
import (
"context"
"io"
"sync"
"time"
"github.com/rs/zerolog/diode/internal/diodes"
)
var bufPool = &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 500)
},
}
type Alerter func(missed int)
// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
// non-blocking and thread safe.
type Writer struct {
w io.Writer
d *diodes.ManyToOne
p *diodes.Poller
c context.CancelFunc
done chan struct{}
}
// NewWriter creates a writer wrapping w with a many-to-one diode in order to
// never block log producers and drop events if the writer can't keep up with
// the flow of data.
//
// Use a diode.Writer when
//
// wr := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) {
// log.Printf("Dropped %d messages", missed)
// })
// log := zerolog.New(wr)
//
//
// See code.cloudfoundry.org/go-diodes for more info on diode.
func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer {
ctx, cancel := context.WithCancel(context.Background())
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
dw := Writer{
w: w,
d: d,
p: diodes.NewPoller(d,
diodes.WithPollingInterval(poolInterval),
diodes.WithPollingContext(ctx)),
c: cancel,
done: make(chan struct{}),
}
go dw.poll()
return dw
}
func (dw Writer) Write(p []byte) (n int, err error) {
// p is pooled in zerolog so we can't hold it passed this call, hence the
// copy.
p = append(bufPool.Get().([]byte), p...)
dw.d.Set(diodes.GenericDataType(&p))
return len(p), nil
}
// Close releases the diode poller and call Close on the wrapped writer if
// io.Closer is implemented.
func (dw Writer) Close() error {
dw.c()
<-dw.done
if w, ok := dw.w.(io.Closer); ok {
return w.Close()
}
return nil
}
func (dw Writer) poll() {
defer close(dw.done)
for {
d := dw.p.Next()
if d == nil {
return
}
p := *(*[]byte)(d)
dw.w.Write(p)
bufPool.Put(p[:0])
}
}