diode: let use a waiter instead of a poller by using 0 as a poolInterval

This commit is contained in:
Olivier Poitrey 2018-10-31 16:57:15 -07:00
parent baa31cfa85
commit e7627a4f73
3 changed files with 42 additions and 23 deletions

View File

@ -19,12 +19,16 @@ var bufPool = &sync.Pool{
type Alerter func(missed int)
type diodeFetcher interface {
diodes.Diode
Next() diodes.GenericDataType
}
// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
// non-blocking and thread safe.
type Writer struct {
w io.Writer
d *diodes.ManyToOne
p *diodes.Poller
d diodeFetcher
c context.CancelFunc
done chan struct{}
}
@ -35,25 +39,34 @@ type Writer struct {
//
// Use a diode.Writer when
//
// wr := diode.NewWriter(w, 1000, 10 * time.Millisecond, func(missed int) {
// wr := diode.NewWriter(w, 1000, 0, func(missed int) {
// log.Printf("Dropped %d messages", missed)
// })
// log := zerolog.New(wr)
//
// If poolInterval is greater than 0, a poller is used otherwise a waiter is
// used.
//
// See code.cloudfoundry.org/go-diodes for more info on diode.
func NewWriter(w io.Writer, size int, poolInterval time.Duration, f Alerter) Writer {
ctx, cancel := context.WithCancel(context.Background())
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
dw := Writer{
w: w,
d: d,
p: diodes.NewPoller(d,
diodes.WithPollingInterval(poolInterval),
diodes.WithPollingContext(ctx)),
w: w,
c: cancel,
done: make(chan struct{}),
}
if f == nil {
f = func(int) {}
}
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
if poolInterval > 0 {
dw.d = diodes.NewPoller(d,
diodes.WithPollingInterval(poolInterval),
diodes.WithPollingContext(ctx))
} else {
dw.d = diodes.NewWaiter(d,
diodes.WithWaiterContext(ctx))
}
go dw.poll()
return dw
}
@ -80,7 +93,7 @@ func (dw Writer) Close() error {
func (dw Writer) poll() {
defer close(dw.done)
for {
d := dw.p.Next()
d := dw.d.Next()
if d == nil {
return
}

View File

@ -5,14 +5,13 @@ package diode_test
import (
"fmt"
"os"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/diode"
)
func ExampleNewWriter() {
w := diode.NewWriter(os.Stdout, 1000, 10*time.Millisecond, func(missed int) {
w := diode.NewWriter(os.Stdout, 1000, 0, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
})
log := zerolog.New(w)

View File

@ -16,7 +16,7 @@ import (
func TestNewWriter(t *testing.T) {
buf := bytes.Buffer{}
w := diode.NewWriter(&buf, 1000, 10*time.Millisecond, func(missed int) {
w := diode.NewWriter(&buf, 1000, 0, func(missed int) {
fmt.Printf("Dropped %d messages\n", missed)
})
log := zerolog.New(w)
@ -33,15 +33,22 @@ func TestNewWriter(t *testing.T) {
func Benchmark(b *testing.B) {
log.SetOutput(ioutil.Discard)
defer log.SetOutput(os.Stderr)
w := diode.NewWriter(ioutil.Discard, 100000, 10*time.Millisecond, nil)
log := zerolog.New(w)
defer w.Close()
b.SetParallelism(1000)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
log.Print("test")
}
})
benchs := map[string]time.Duration{
"Waiter": 0,
"Pooler": 10 * time.Millisecond,
}
for name, interval := range benchs {
b.Run(name, func(b *testing.B) {
w := diode.NewWriter(ioutil.Discard, 100000, interval, nil)
log := zerolog.New(w)
defer w.Close()
b.SetParallelism(1000)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
log.Print("test")
}
})
})
}
}