From b95f29ba038c95a956fce18870911ae4b822ecf3 Mon Sep 17 00:00:00 2001 From: a Date: Sat, 26 Mar 2022 20:44:39 -0500 Subject: [PATCH] Example --- go.mod | 2 +- parallel/map.go | 55 ++++++++++++++++++++++++++--- parallel/parallel.go | 5 +-- parallel/request_test.go | 74 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 127 insertions(+), 9 deletions(-) create mode 100644 parallel/request_test.go diff --git a/go.mod b/go.mod index 300c325..cbee59c 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module git.tuxpa.in/a/lambda go 1.18 -require golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 // indirect +require golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 diff --git a/parallel/map.go b/parallel/map.go index 1d8cbaa..9d92883 100644 --- a/parallel/map.go +++ b/parallel/map.go @@ -6,8 +6,12 @@ import ( "git.tuxpa.in/a/lambda" ) -func Map[T any](xs []T, fx func(T) T) []T { - spl := lambda.Split(xs, routineCount) +func Map[T any](xs []T, fx func(T) T, n ...int) []T { + rc := routineCount + if len(n) > 0 { + rc = n[0] + } + spl := lambda.Split(xs, rc) wg := new(sync.WaitGroup) wg.Add(len(spl)) for i, v := range spl { @@ -20,8 +24,12 @@ func Map[T any](xs []T, fx func(T) T) []T { return lambda.Flatten(spl) } -func MapV[T, V any](xs []T, fx func(T) V) []V { - spl := lambda.Split(xs, routineCount) +func MapV[T, V any](xs []T, fx func(T) V, n ...int) []V { + rc := routineCount + if len(n) > 0 { + rc = n[0] + } + spl := lambda.Split(xs, rc) wg := new(sync.WaitGroup) wg.Add(len(spl)) tmp := make([][]V, len(spl)) @@ -34,3 +42,42 @@ func MapV[T, V any](xs []T, fx func(T) V) []V { wg.Wait() return lambda.Flatten(tmp) } + +func MapError[T any](xs []T, fx func(T) (T, error), n ...int) ([]T, []error) { + rc := routineCount + if len(n) > 0 { + rc = n[0] + } + spl := lambda.Split(xs, rc) + wg := new(sync.WaitGroup) + wg.Add(len(spl)) + tmp := make([][]error, len(spl)) + for i, v := range spl { + go func(ix int, vx []T) { + spl[ix], tmp[ix] = lambda.MapError(vx, fx) + wg.Done() + }(i, v) + } + wg.Wait() + return lambda.Flatten(spl), lambda.Flatten(tmp) +} + +func MapErrorV[T, V any](xs []T, fx func(T) (V, error), n ...int) ([]V, []error) { + rc := routineCount + if len(n) > 0 { + rc = n[0] + } + spl := lambda.Split(xs, rc) + wg := new(sync.WaitGroup) + wg.Add(len(spl)) + tmp := make([][]V, len(spl)) + tmp2 := make([][]error, len(spl)) + for i, v := range spl { + go func(ix int, vx []T) { + tmp[ix], tmp2[ix] = lambda.MapErrorV(vx, fx) + wg.Done() + }(i, v) + } + wg.Wait() + return lambda.Flatten(tmp), lambda.Flatten(tmp2) +} diff --git a/parallel/parallel.go b/parallel/parallel.go index 89b7085..711bdac 100644 --- a/parallel/parallel.go +++ b/parallel/parallel.go @@ -1,9 +1,6 @@ package parallel -import ( -) - -var routineCount int = 1 +var routineCount int = 4 func SetRoutineCount(n int) { routineCount = n diff --git a/parallel/request_test.go b/parallel/request_test.go new file mode 100644 index 0000000..751fee8 --- /dev/null +++ b/parallel/request_test.go @@ -0,0 +1,74 @@ +package parallel_test + +import ( + "encoding/json" + "fmt" + "net/http" + "testing" + + "git.tuxpa.in/a/lambda/parallel" +) + +var requests = make([]*http.Request, 20) + +func init() { + for i := range requests { + r, _ := http.NewRequest("GET", fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i), nil) + requests[i] = r + } +} + +type someResult struct { + Title string `json:"title"` + Id int `json:"id"` +} + +func TestParallelRequest4Routine(t *testing.T) { + results, errs := parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) { + out := &someResult{} + res, err := http.DefaultClient.Do(r) + if err != nil { + return nil, err + } + defer res.Body.Close() + err = json.NewDecoder(res.Body).Decode(out) + return out, err + }, 4) + for i, v := range results { + if errs[i] != nil { + t.Logf("err: %s", errs[i]) + } else { + t.Logf("result: %+v", v) + } + } +} + +func BenchmarkParallelRequest1Routine(b *testing.B) { + for i := 0; i < b.N; i++ { + parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) { + out := &someResult{} + res, err := http.DefaultClient.Do(r) + if err != nil { + return nil, err + } + defer res.Body.Close() + err = json.NewDecoder(res.Body).Decode(out) + return out, err + }, 1) + } +} + +func BenchmarkParallelRequest4Routine(b *testing.B) { + for i := 0; i < b.N; i++ { + parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) { + out := &someResult{} + res, err := http.DefaultClient.Do(r) + if err != nil { + return nil, err + } + defer res.Body.Close() + err = json.NewDecoder(res.Body).Decode(out) + return out, err + }, 4) + } +}