2018-02-20 09:52:12 +00:00
|
|
|
// Package diode provides a thread-safe, lock-free, non-blocking io.Writer
|
|
|
|
// wrapper.
|
|
|
|
package diode
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"io"
|
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
2022-11-03 15:18:09 +00:00
|
|
|
"tuxpa.in/a/zlog/diode/internal/diodes"
|
2018-02-20 09:52:12 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var bufPool = &sync.Pool{
|
|
|
|
New: func() interface{} {
|
|
|
|
return make([]byte, 0, 500)
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2018-05-25 01:50:57 +00:00
|
|
|
type Alerter func(missed int)
|
|
|
|
|
2018-10-31 23:57:15 +00:00
|
|
|
type diodeFetcher interface {
|
|
|
|
diodes.Diode
|
|
|
|
Next() diodes.GenericDataType
|
|
|
|
}
|
|
|
|
|
2018-02-20 09:52:12 +00:00
|
|
|
// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
|
|
|
|
// non-blocking and thread safe.
|
|
|
|
type Writer struct {
|
|
|
|
w io.Writer
|
2018-10-31 23:57:15 +00:00
|
|
|
d diodeFetcher
|
2018-02-20 09:52:12 +00:00
|
|
|
c context.CancelFunc
|
|
|
|
done chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
// NewWriter creates a writer wrapping w with a many-to-one diode in order to
|
|
|
|
// never block log producers and drop events if the writer can't keep up with
|
|
|
|
// the flow of data.
|
|
|
|
//
|
|
|
|
// Use a diode.Writer when
|
|
|
|
//
|
2018-10-31 23:57:15 +00:00
|
|
|
// wr := diode.NewWriter(w, 1000, 0, func(missed int) {
|
2018-02-20 09:52:12 +00:00
|
|
|
// log.Printf("Dropped %d messages", missed)
|
2018-05-25 01:50:57 +00:00
|
|
|
// })
|
2022-03-20 19:19:42 +00:00
|
|
|
// log := zlog.New(wr)
|
2018-05-25 21:45:33 +00:00
|
|
|
//
|
2018-11-02 20:06:29 +00:00
|
|
|
// If pollInterval is greater than 0, a poller is used otherwise a waiter is
|
2018-10-31 23:57:15 +00:00
|
|
|
// used.
|
2018-02-20 09:52:12 +00:00
|
|
|
//
|
|
|
|
// See code.cloudfoundry.org/go-diodes for more info on diode.
|
2020-08-06 10:19:27 +00:00
|
|
|
func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer {
|
2018-02-20 09:52:12 +00:00
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
dw := Writer{
|
2018-10-31 23:57:15 +00:00
|
|
|
w: w,
|
2018-02-20 09:52:12 +00:00
|
|
|
c: cancel,
|
|
|
|
done: make(chan struct{}),
|
|
|
|
}
|
2018-10-31 23:57:15 +00:00
|
|
|
if f == nil {
|
|
|
|
f = func(int) {}
|
|
|
|
}
|
|
|
|
d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
|
2020-08-06 10:19:27 +00:00
|
|
|
if pollInterval > 0 {
|
2018-10-31 23:57:15 +00:00
|
|
|
dw.d = diodes.NewPoller(d,
|
2020-08-06 10:19:27 +00:00
|
|
|
diodes.WithPollingInterval(pollInterval),
|
2018-10-31 23:57:15 +00:00
|
|
|
diodes.WithPollingContext(ctx))
|
|
|
|
} else {
|
|
|
|
dw.d = diodes.NewWaiter(d,
|
|
|
|
diodes.WithWaiterContext(ctx))
|
|
|
|
}
|
2018-02-20 09:52:12 +00:00
|
|
|
go dw.poll()
|
|
|
|
return dw
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dw Writer) Write(p []byte) (n int, err error) {
|
2022-03-20 19:19:42 +00:00
|
|
|
// p is pooled in zlog so we can't hold it passed this call, hence the
|
2018-02-20 09:52:12 +00:00
|
|
|
// copy.
|
|
|
|
p = append(bufPool.Get().([]byte), p...)
|
|
|
|
dw.d.Set(diodes.GenericDataType(&p))
|
|
|
|
return len(p), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Close releases the diode poller and call Close on the wrapped writer if
|
|
|
|
// io.Closer is implemented.
|
|
|
|
func (dw Writer) Close() error {
|
|
|
|
dw.c()
|
|
|
|
<-dw.done
|
|
|
|
if w, ok := dw.w.(io.Closer); ok {
|
|
|
|
return w.Close()
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (dw Writer) poll() {
|
|
|
|
defer close(dw.done)
|
|
|
|
for {
|
2018-10-31 23:57:15 +00:00
|
|
|
d := dw.d.Next()
|
2018-02-20 09:52:12 +00:00
|
|
|
if d == nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
p := *(*[]byte)(d)
|
|
|
|
dw.w.Write(p)
|
2018-09-19 07:18:07 +00:00
|
|
|
|
|
|
|
// Proper usage of a sync.Pool requires each entry to have approximately
|
|
|
|
// the same memory cost. To obtain this property when the stored type
|
|
|
|
// contains a variably-sized buffer, we add a hard limit on the maximum buffer
|
|
|
|
// to place back in the pool.
|
|
|
|
//
|
|
|
|
// See https://golang.org/issue/23199
|
|
|
|
const maxSize = 1 << 16 // 64KiB
|
|
|
|
if cap(p) <= maxSize {
|
|
|
|
bufPool.Put(p[:0])
|
|
|
|
}
|
2018-02-20 09:52:12 +00:00
|
|
|
}
|
|
|
|
}
|