Add diode.Writer, a thread-safe, lock-free, non-blocking writer wrapper
This commit is contained in:
parent
9ee98f91c4
commit
8c1c6a0cd7
23
README.md
23
README.md
|
@ -51,7 +51,7 @@ func main() {
|
|||
// Output: {"time":1516134303,"level":"debug","message":"hello world"}
|
||||
```
|
||||
> Note: The default log level for `log.Print` is *debug*
|
||||
----
|
||||
|
||||
### Leveled Logging
|
||||
|
||||
#### Simple Leveled Logging Example
|
||||
|
@ -84,7 +84,9 @@ func main() {
|
|||
You can set the Global logging level to any of these options using the `SetGlobalLevel` function in the zerolog package, passing in one of the given constants above, e.g. `zerolog.InfoLevel` would be the "info" level. Whichever level is chosen, all logs with a level greater than or equal to that level will be written. To turn off logging entirely, pass the `zerolog.Disabled` constant.
|
||||
|
||||
#### Setting Global Log Level
|
||||
|
||||
This example uses command-line flags to demonstrate various outputs depending on the chosen log level.
|
||||
|
||||
```go
|
||||
package main
|
||||
|
||||
|
@ -158,7 +160,7 @@ func main() {
|
|||
// exit status 1
|
||||
```
|
||||
> NOTE: Using `Msgf` generates one allocation even when the logger is disabled.
|
||||
----------------
|
||||
|
||||
### Contextual Logging
|
||||
|
||||
#### Fields can be added to log messages
|
||||
|
@ -192,8 +194,6 @@ sublogger.Info().Msg("hello world")
|
|||
// Output: {"level":"info","time":1494567715,"message":"hello world","component":"foo"}
|
||||
```
|
||||
|
||||
|
||||
|
||||
### Pretty logging
|
||||
|
||||
```go
|
||||
|
@ -245,6 +245,21 @@ log.Log().Str("foo","bar").Msg("")
|
|||
log.Logger = log.With().Str("foo", "bar").Logger()
|
||||
```
|
||||
|
||||
### Thread-safe, lock-free, non-blocking writer
|
||||
|
||||
If your writer might be slow or not thread-safe and you need your log producers to never get slowed down by a slow writer, you can use a `diode.Writer` as follow:
|
||||
|
||||
```go
|
||||
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
|
||||
fmt.Printf("Dropped %d messages\n", missed)
|
||||
}))
|
||||
w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond)
|
||||
log := zerolog.New(w)
|
||||
log.Print("test")
|
||||
```
|
||||
|
||||
You will need to install `code.cloudfoundry.org/go-diodes` to use this feature.
|
||||
|
||||
### Log Sampling
|
||||
|
||||
```go
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
// Package diode provides a thread-safe, lock-free, non-blocking io.Writer
|
||||
// wrapper.
|
||||
package diode
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
diodes "code.cloudfoundry.org/go-diodes"
|
||||
)
|
||||
|
||||
var bufPool = &sync.Pool{
|
||||
New: func() interface{} {
|
||||
return make([]byte, 0, 500)
|
||||
},
|
||||
}
|
||||
|
||||
// Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
|
||||
// non-blocking and thread safe.
|
||||
type Writer struct {
|
||||
w io.Writer
|
||||
d *diodes.ManyToOne
|
||||
p *diodes.Poller
|
||||
c context.CancelFunc
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// NewWriter creates a writer wrapping w with a many-to-one diode in order to
|
||||
// never block log producers and drop events if the writer can't keep up with
|
||||
// the flow of data.
|
||||
//
|
||||
// Use a diode.Writer when
|
||||
//
|
||||
// d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
|
||||
// log.Printf("Dropped %d messages", missed)
|
||||
// }))
|
||||
// w := diode.NewWriter(w, d, 10 * time.Millisecond)
|
||||
// log := zerolog.New(w)
|
||||
//
|
||||
// See code.cloudfoundry.org/go-diodes for more info on diode.
|
||||
func NewWriter(w io.Writer, manyToOneDiode *diodes.ManyToOne, poolInterval time.Duration) Writer {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
dw := Writer{
|
||||
w: w,
|
||||
d: manyToOneDiode,
|
||||
p: diodes.NewPoller(manyToOneDiode,
|
||||
diodes.WithPollingInterval(poolInterval),
|
||||
diodes.WithPollingContext(ctx)),
|
||||
c: cancel,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
go dw.poll()
|
||||
return dw
|
||||
}
|
||||
|
||||
func (dw Writer) Write(p []byte) (n int, err error) {
|
||||
// p is pooled in zerolog so we can't hold it passed this call, hence the
|
||||
// copy.
|
||||
p = append(bufPool.Get().([]byte), p...)
|
||||
dw.d.Set(diodes.GenericDataType(&p))
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
// Close releases the diode poller and call Close on the wrapped writer if
|
||||
// io.Closer is implemented.
|
||||
func (dw Writer) Close() error {
|
||||
dw.c()
|
||||
<-dw.done
|
||||
if w, ok := dw.w.(io.Closer); ok {
|
||||
return w.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (dw Writer) poll() {
|
||||
defer close(dw.done)
|
||||
for {
|
||||
d := dw.p.Next()
|
||||
if d == nil {
|
||||
return
|
||||
}
|
||||
p := *(*[]byte)(d)
|
||||
dw.w.Write(p)
|
||||
bufPool.Put(p[:0])
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
package diode_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
diodes "code.cloudfoundry.org/go-diodes"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/rs/zerolog/diode"
|
||||
)
|
||||
|
||||
func ExampleNewWriter() {
|
||||
d := diodes.NewManyToOne(1000, diodes.AlertFunc(func(missed int) {
|
||||
fmt.Printf("Dropped %d messages\n", missed)
|
||||
}))
|
||||
w := diode.NewWriter(os.Stdout, d, 10*time.Millisecond)
|
||||
log := zerolog.New(w)
|
||||
log.Print("test")
|
||||
|
||||
w.Close()
|
||||
|
||||
// Output: {"level":"debug","message":"test"}
|
||||
}
|
||||
|
||||
func Benchmark(b *testing.B) {
|
||||
log.SetOutput(ioutil.Discard)
|
||||
defer log.SetOutput(os.Stderr)
|
||||
d := diodes.NewManyToOne(100000, nil)
|
||||
w := diode.NewWriter(ioutil.Discard, d, 10*time.Millisecond)
|
||||
log := zerolog.New(w)
|
||||
defer w.Close()
|
||||
|
||||
b.SetParallelism(1000)
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
log.Print("test")
|
||||
}
|
||||
})
|
||||
|
||||
}
|
Loading…
Reference in New Issue