diff --git a/diode/diode.go b/diode/diode.go index 751cafe..cb0c50d 100644 --- a/diode/diode.go +++ b/diode/diode.go @@ -19,12 +19,16 @@ var bufPool = &sync.Pool{ type Alerter func(missed int) +type diodeFetcher interface { + diodes.Diode + Next() diodes.GenericDataType +} + // Writer is a io.Writer wrapper that uses a diode to make Write lock-free, // non-blocking and thread safe. type Writer struct { w io.Writer - d *diodes.ManyToOne - p *diodes.Poller + d diodeFetcher c context.CancelFunc done chan struct{} } @@ -35,25 +39,34 @@ type Writer struct { // // Use a diode.Writer when // -// wr := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) { +// wr := diode.NewWriter(w, 1000, 0, func(missed int) { // log.Printf("Dropped %d messages", missed) // }) // log := zerolog.New(wr) // +// If poolInterval is greater than 0, a poller is used otherwise a waiter is +// used. // // See code.cloudfoundry.org/go-diodes for more info on diode. func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer { ctx, cancel := context.WithCancel(context.Background()) - d := diodes.NewManyToOne(size, diodes.AlertFunc(f)) dw := Writer{ - w: w, - d: d, - p: diodes.NewPoller(d, - diodes.WithPollingInterval(poolInterval), - diodes.WithPollingContext(ctx)), + w: w, c: cancel, done: make(chan struct{}), } + if f == nil { + f = func(int) {} + } + d := diodes.NewManyToOne(size, diodes.AlertFunc(f)) + if poolInterval > 0 { + dw.d = diodes.NewPoller(d, + diodes.WithPollingInterval(poolInterval), + diodes.WithPollingContext(ctx)) + } else { + dw.d = diodes.NewWaiter(d, + diodes.WithWaiterContext(ctx)) + } go dw.poll() return dw } @@ -80,7 +93,7 @@ func (dw Writer) Close() error { func (dw Writer) poll() { defer close(dw.done) for { - d := dw.p.Next() + d := dw.d.Next() if d == nil { return } diff --git a/diode/diode_example_test.go b/diode/diode_example_test.go index a097c57..3540db6 100644 --- a/diode/diode_example_test.go +++ b/diode/diode_example_test.go @@ -5,14 +5,13 @@ package diode_test import ( "fmt" "os" - "time" "github.com/rs/zerolog" "github.com/rs/zerolog/diode" ) func ExampleNewWriter() { - w := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) { + w := diode.NewWriter(os.Stdout, 1000, 0, func(missed int) { fmt.Printf("Dropped %d messages\n", missed) }) log := zerolog.New(w) diff --git a/diode/diode_test.go b/diode/diode_test.go index 6171cb4..098bd44 100644 --- a/diode/diode_test.go +++ b/diode/diode_test.go @@ -16,7 +16,7 @@ import ( func TestNewWriter(t *testing.T) { buf := bytes.Buffer{} - w := diode.NewWriter(&buf, 1000, 10*time.Millisecond, func(missed int) { + w := diode.NewWriter(&buf, 1000, 0, func(missed int) { fmt.Printf("Dropped %d messages\n", missed) }) log := zerolog.New(w) @@ -33,15 +33,22 @@ func TestNewWriter(t *testing.T) { func Benchmark(b *testing.B) { log.SetOutput(ioutil.Discard) defer log.SetOutput(os.Stderr) - w := diode.NewWriter(ioutil.Discard, 100000, 10*time.Millisecond, nil) - log := zerolog.New(w) - defer w.Close() - - b.SetParallelism(1000) - b.RunParallel(func(pb *testing.PB) { - for pb.Next() { - log.Print("test") - } - }) + benchs := map[string]time.Duration{ + "Waiter": 0, + "Pooler": 10 * time.Millisecond, + } + for name, interval := range benchs { + b.Run(name, func(b *testing.B) { + w := diode.NewWriter(ioutil.Discard, 100000, interval, nil) + log := zerolog.New(w) + defer w.Close() + b.SetParallelism(1000) + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + log.Print("test") + } + }) + }) + } }