This commit is contained in:
a 2022-03-26 20:44:39 -05:00
parent 0df83bbddb
commit b95f29ba03
4 changed files with 127 additions and 9 deletions

2
go.mod
View File

@ -2,4 +2,4 @@ module git.tuxpa.in/a/lambda
go 1.18 go 1.18
require golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8 // indirect require golang.org/x/exp v0.0.0-20220325121720-054d8573a5d8

View File

@ -6,8 +6,12 @@ import (
"git.tuxpa.in/a/lambda" "git.tuxpa.in/a/lambda"
) )
func Map[T any](xs []T, fx func(T) T) []T { func Map[T any](xs []T, fx func(T) T, n ...int) []T {
spl := lambda.Split(xs, routineCount) rc := routineCount
if len(n) > 0 {
rc = n[0]
}
spl := lambda.Split(xs, rc)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(len(spl)) wg.Add(len(spl))
for i, v := range spl { for i, v := range spl {
@ -20,8 +24,12 @@ func Map[T any](xs []T, fx func(T) T) []T {
return lambda.Flatten(spl) return lambda.Flatten(spl)
} }
func MapV[T, V any](xs []T, fx func(T) V) []V { func MapV[T, V any](xs []T, fx func(T) V, n ...int) []V {
spl := lambda.Split(xs, routineCount) rc := routineCount
if len(n) > 0 {
rc = n[0]
}
spl := lambda.Split(xs, rc)
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
wg.Add(len(spl)) wg.Add(len(spl))
tmp := make([][]V, len(spl)) tmp := make([][]V, len(spl))
@ -34,3 +42,42 @@ func MapV[T, V any](xs []T, fx func(T) V) []V {
wg.Wait() wg.Wait()
return lambda.Flatten(tmp) return lambda.Flatten(tmp)
} }
func MapError[T any](xs []T, fx func(T) (T, error), n ...int) ([]T, []error) {
rc := routineCount
if len(n) > 0 {
rc = n[0]
}
spl := lambda.Split(xs, rc)
wg := new(sync.WaitGroup)
wg.Add(len(spl))
tmp := make([][]error, len(spl))
for i, v := range spl {
go func(ix int, vx []T) {
spl[ix], tmp[ix] = lambda.MapError(vx, fx)
wg.Done()
}(i, v)
}
wg.Wait()
return lambda.Flatten(spl), lambda.Flatten(tmp)
}
func MapErrorV[T, V any](xs []T, fx func(T) (V, error), n ...int) ([]V, []error) {
rc := routineCount
if len(n) > 0 {
rc = n[0]
}
spl := lambda.Split(xs, rc)
wg := new(sync.WaitGroup)
wg.Add(len(spl))
tmp := make([][]V, len(spl))
tmp2 := make([][]error, len(spl))
for i, v := range spl {
go func(ix int, vx []T) {
tmp[ix], tmp2[ix] = lambda.MapErrorV(vx, fx)
wg.Done()
}(i, v)
}
wg.Wait()
return lambda.Flatten(tmp), lambda.Flatten(tmp2)
}

View File

@ -1,9 +1,6 @@
package parallel package parallel
import ( var routineCount int = 4
)
var routineCount int = 1
func SetRoutineCount(n int) { func SetRoutineCount(n int) {
routineCount = n routineCount = n

74
parallel/request_test.go Normal file
View File

@ -0,0 +1,74 @@
package parallel_test
import (
"encoding/json"
"fmt"
"net/http"
"testing"
"git.tuxpa.in/a/lambda/parallel"
)
var requests = make([]*http.Request, 20)
func init() {
for i := range requests {
r, _ := http.NewRequest("GET", fmt.Sprintf("https://jsonplaceholder.typicode.com/todos/%d", i), nil)
requests[i] = r
}
}
type someResult struct {
Title string `json:"title"`
Id int `json:"id"`
}
func TestParallelRequest4Routine(t *testing.T) {
results, errs := parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) {
out := &someResult{}
res, err := http.DefaultClient.Do(r)
if err != nil {
return nil, err
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(out)
return out, err
}, 4)
for i, v := range results {
if errs[i] != nil {
t.Logf("err: %s", errs[i])
} else {
t.Logf("result: %+v", v)
}
}
}
func BenchmarkParallelRequest1Routine(b *testing.B) {
for i := 0; i < b.N; i++ {
parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) {
out := &someResult{}
res, err := http.DefaultClient.Do(r)
if err != nil {
return nil, err
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(out)
return out, err
}, 1)
}
}
func BenchmarkParallelRequest4Routine(b *testing.B) {
for i := 0; i < b.N; i++ {
parallel.MapErrorV(requests, func(r *http.Request) (*someResult, error) {
out := &someResult{}
res, err := http.DefaultClient.Do(r)
if err != nil {
return nil, err
}
defer res.Body.Close()
err = json.NewDecoder(res.Body).Decode(out)
return out, err
}, 4)
}
}