Add initial objectstorage wal manager

This commit is contained in:
Simone Gotti 2019-02-22 08:45:59 +01:00
parent 02ed2871db
commit 86e8479de9
6 changed files with 2371 additions and 78 deletions

22
go.mod
View File

@ -3,23 +3,21 @@ module github.com/sorintlab/agola
require (
github.com/Masterminds/squirrel v1.1.0
github.com/go-ini/ini v1.42.0 // indirect
github.com/go-sql-driver/mysql v1.4.1 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect
github.com/jtolds/gls v4.2.1+incompatible // indirect
github.com/lib/pq v1.0.0 // indirect
github.com/gogo/protobuf v1.2.1 // indirect
github.com/golang/protobuf v1.3.0 // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/kr/pty v1.1.3 // indirect
github.com/mattn/go-sqlite3 v1.10.0
github.com/minio/minio-go v6.0.14+incompatible
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pkg/errors v0.8.0
github.com/pkg/errors v0.8.1
github.com/sanity-io/litter v1.1.0
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac // indirect
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c // indirect
github.com/satori/go.uuid v1.2.0
github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a
go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 // indirect
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e // indirect
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
google.golang.org/appengine v1.4.0 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 // indirect
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 // indirect
google.golang.org/grpc v1.19.0 // indirect
gopkg.in/yaml.v2 v2.2.2
)

109
go.sum
View File

@ -1,119 +1,97 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Masterminds/squirrel v1.1.0 h1:baP1qLdoQCeTw3ifCdOq2dkYc6vGcmRdaociKLbEJXs=
github.com/Masterminds/squirrel v1.1.0/go.mod h1:yaPeOnPG5ZRwL9oKdTsO/prlkPbXWZlRVMQ/gGlzIuA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/coreos/go-semver v0.2.0 h1:3Jm3tLmsgAYcjC+4Up7hJrFBPr+n7rAqYeSw/SZazuY=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/etcd v3.3.12+incompatible h1:pAWNwdf7QiT1zfaWyqCtNZQWCLByQyA3JrSQyuYAqnQ=
github.com/coreos/etcd v3.3.12+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7 h1:u9SHYsPQNyt5tgDm3YN7+9dYrpK96E5wFilTFWIDZOM=
github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf h1:CAKfRE2YtTUIjjh1bkBtyYFaUT/WmOqsJjgtihT0vMI=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-ini/ini v1.42.0 h1:TWr1wGj35+UiWHlBA8er89seFXxzwFn11spilrrj+38=
github.com/go-ini/ini v1.42.0/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8=
github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA=
github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/gogo/protobuf v1.0.0 h1:2jyBKDKU/8v3v2xVR2PtiWQviFUyiaGk2rpfyFT8rTM=
github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58=
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903 h1:LbsanbbD6LieFkXbj9YNNBupiGHJgFeLpO0j0Fza1h8=
github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a h1:ZJu5NB1Bk5ms4vw0Xu4i+jD32SE9jQXyfnOvwhHqlT0=
github.com/golang/protobuf v1.3.0 h1:kbxbvI4Un1LUWKxufD+BiE6AEExYYgkQLQmLFqA1LFk=
github.com/golang/protobuf v1.3.0/go.mod h1:Qd/q+1AKNOZr9uGQzbzCmRO6sUih6GTPZv6a1/R87v0=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1ks85zJ1lfDGgIiMDuIptTOhJq+zKyg=
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c h1:Lh2aW+HnU2Nbe1gqD9SOJLJxW1jBMmQOktN2acDyJk8=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0 h1:Iju5GlWwrvL6UBg4zJJt3btmonfrMlCDdsejg4CZE7c=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92BcuyuQ/YW4NSIpoGtfXNho=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.4.1 h1:pX7cnDwSSmG0dR9yNjCQSSpmsJOqFdT7SzVp5Yl9uVw=
github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/kr/pty v1.0.0/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3 h1:/Um6a/ZmD5tF7peoOJ5oN5KMQ0DrGVQSXLNwyckutPk=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 h1:SOEGU9fKiNWd/HOJuq6+3iTQz8KNCLtVX6idSoTLdUw=
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 h1:P6pPBnrTSX3DEVR4fDembhRWSsG5rVo6hYhAB/ADZrk=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/lib/pq v1.0.0 h1:X5PMW56eZitiTeO7tKzZxFCSpbFZJtkMMooicw2us9A=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-sqlite3 v1.10.0 h1:jbhqpg7tQe4SupckyijYiy0mJJ/pRyHvXf7JdWK860o=
github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc=
github.com/matttproud/golang_protobuf_extensions v1.0.0 h1:YNOwxxSJzSUARoD9KRZLzM9Y858MNGCOACTvCW9TSAc=
github.com/matttproud/golang_protobuf_extensions v1.0.0/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
github.com/minio/minio-go v6.0.14+incompatible h1:fnV+GD28LeqdN6vT2XdGKW8Qe/IfjJDswNVuni6km9o=
github.com/minio/minio-go v6.0.14+incompatible/go.mod h1:7guKYtitv8dktvNUGrhzmNlA5wrAABTQXCoesZdFQO8=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8=
github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612 h1:13pIdM2tpaDi4OVe24fgoIS7ZTqMt0QI+bwQsX5hq+g=
github.com/prometheus/client_model v0.0.0-20170216185247-6f3806018612/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L7EX0km2LYM8HKpNWRiouxjE3XHkyGc=
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/sanity-io/litter v1.1.0 h1:BllcKWa3VbZmOZbDCoszYLk7zCsKHz5Beossi8SUcTc=
github.com/sanity-io/litter v1.1.0/go.mod h1:CJ0VCw2q4qKU7LaQr3n7UOSHzgEMgcGco7N/SkZQPjw=
github.com/sirupsen/logrus v1.0.5 h1:8c8b5uO0zS4X6RPl/sd1ENwSkIc0/H2PaHxE3udaE8I=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a h1:u7WP9TGHJIkJoi/dRDhvYPSthMIdUQPDETiZET/Utl8=
github.com/sgotti/gexpect v0.0.0-20161123102107-0afc6c19f50a/go.mod h1:HvB0+YQff1QGS1nct9E3/J8wo8s/EVjq+VXrJSDlQEY=
github.com/sirupsen/logrus v1.0.5/go.mod h1:pMByvHTf9Beacp5x1UXfOR9xyW/9antXMhjMPG0dEzc=
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac h1:wbW+Bybf9pXxnCFAOWZTqkRjAc7rAIwo2e1ArUhiHxg=
github.com/smartystreets/assertions v0.0.0-20190215210624-980c5ac6f3ac/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c h1:Ho+uVpkel/udgjbwB5Lktg9BtvJSh2DT0Hi6LPSyI2w=
github.com/smartystreets/goconvey v0.0.0-20181108003508-044398e4856c/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s=
github.com/soheilhy/cmux v0.1.4 h1:0HKaf1o97UwFjHH9o5XsHUOF+tqmdA7KEzXLpiyaw0E=
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ=
github.com/spf13/pflag v1.0.1 h1:aCvUg6QPl3ibpQUxyLkrEkCHtPqYJL4x9AuhqVqFis4=
github.com/spf13/pflag v1.0.1/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8 h1:ndzgwNDnKIqyCvHTXaCqh9KlOWKvBry6nuXMJmonVsE=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/ugorji/go v1.1.1 h1:gmervu+jDMvXTbcHQ0pd2wee85nEoE0BsVyEuzkfK8w=
github.com/ugorji/go v1.1.1/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=
github.com/urfave/cli v1.18.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18 h1:MPPkRncZLN9Kh4MEFmbnK4h3BD7AUmskWv2+EeZJCCs=
github.com/xiang90/probing v0.0.0-20160813154853-07dd2e8dfe18/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
go.etcd.io/bbolt v1.3.1-etcd.7 h1:M0l89sIuZ+RkW0rLbUsmxescVzLwLUs+Kvks+0jeHdM=
go.etcd.io/bbolt v1.3.1-etcd.7/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25 h1:CwNwQW5bpjZHQ9KrGCx5vGYoq+LKcm7ZRGiCYYz6uXM=
go.etcd.io/etcd v0.0.0-20181128220305-dedae6eb7c25/go.mod h1:weASp41xM3dk0YHg1s/W8ecdGP5G4teSTMBPpYAaUgA=
go.etcd.io/etcd v3.3.12+incompatible h1:V6PRYRGpU4k5EajJaaj/GL3hqIdzyPnBU8aPUp+35yw=
go.etcd.io/etcd v3.3.12+incompatible/go.mod h1:yaeTdrJi5lOmYerz05bd8+V7KubZs8YSFZfzsF9A6aI=
go.uber.org/atomic v1.3.2 h1:2Oa65PReHzfn29GpvgsYwloV9AVFHPDk8tYxt2c2tr4=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/multierr v1.1.0 h1:HoEmRHQPVSqub6w2z2d2EOVs2fjyFRGyofhKuyDq0QI=
@ -121,40 +99,39 @@ go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/
go.uber.org/zap v1.9.1 h1:XCJQEf3W6eZaVwhRBof6ImoYGJSITeKWsyeh3HFu/5o=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20180608092829-8ac0e0d97ce4/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 h1:mKdxBk7AujPs8kU4m80U72y/zjbZ3UcXC7dClwKbUI0=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25 h1:jsG6UpNLt9iAsb0S2AGW28DveNzzgmbXR+ENoPjUeIU=
golang.org/x/crypto v0.0.0-20190228161510-8dd112bcdc25/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e h1:bRhVy7zSSasaqNksaRZiA5EEI+Ei4I1nO5Jh72wfHlg=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95 h1:fY7Dsw114eJN4boqzVSbpVHO6rTdhq6/GnXeu+PKnzU=
golang.org/x/net v0.0.0-20190301231341-16b79f2e4e95/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 h1:YUO/7uOKsKeq9UokNS62b8FYywz3ker1l1vDZRCRefw=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2 h1:+DCIGbF/swA92ohVg0//6X2IVY3KZs6p9mix0ziNYJM=
golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80 h1:GL7nK1hkDKrkor0eVOYcMdIsUGErFnaC2gpBOVC+vbI=
golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/genproto v0.0.0-20180608181217-32ee49c4dd80/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0 h1:ArxJuB1NWfPY6r9Gp9gqwplT0Ge7nqv9msgu03lHLmo=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b h1:lohp5blsw53GBXtLyLNaTXPXS9pJ1tiTw61ZHUoE9Qw=
google.golang.org/genproto v0.0.0-20180831171423-11092d34479b/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo=
google.golang.org/grpc v1.19.0 h1:cfg4PD8YEdSFnm7qLV4++93WcmhH2nIUhMjhdCvl3j8=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

389
internal/testutil/utils.go Normal file
View File

@ -0,0 +1,389 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package testutil
import (
"bufio"
"context"
"fmt"
"net"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"sync"
"testing"
"time"
"github.com/sorintlab/agola/internal/etcd"
"go.etcd.io/etcd/embed"
"go.uber.org/zap"
uuid "github.com/satori/go.uuid"
"github.com/sgotti/gexpect"
)
const (
sleepInterval = 500 * time.Millisecond
etcdTimeout = 5 * time.Second
MinPort = 2048
MaxPort = 16384
)
var curPort = MinPort
var portMutex = sync.Mutex{}
type Process struct {
t *testing.T
uid string
name string
args []string
Cmd *gexpect.ExpectSubprocess
bin string
}
func (p *Process) start() error {
if p.Cmd != nil {
panic(fmt.Errorf("%s: cmd not cleanly stopped", p.uid))
}
cmd := exec.Command(p.bin, p.args...)
pr, pw, err := os.Pipe()
if err != nil {
return err
}
p.Cmd = &gexpect.ExpectSubprocess{Cmd: cmd, Output: pw}
if err := p.Cmd.Start(); err != nil {
return err
}
go func() {
scanner := bufio.NewScanner(pr)
for scanner.Scan() {
p.t.Logf("[%s %s]: %s", p.name, p.uid, scanner.Text())
}
}()
return nil
}
func (p *Process) Start() error {
if err := p.start(); err != nil {
return err
}
p.Cmd.Continue()
return nil
}
func (p *Process) StartExpect() error {
return p.start()
}
func (p *Process) Signal(sig os.Signal) error {
p.t.Logf("signalling %s %s with %s", p.name, p.uid, sig)
if p.Cmd == nil {
panic(fmt.Errorf("p: %s, cmd is empty", p.uid))
}
return p.Cmd.Cmd.Process.Signal(sig)
}
func (p *Process) Kill() {
p.t.Logf("killing %s %s", p.name, p.uid)
if p.Cmd == nil {
panic(fmt.Errorf("p: %s, cmd is empty", p.uid))
}
p.Cmd.Cmd.Process.Signal(os.Kill)
p.Cmd.Wait()
p.Cmd = nil
}
func (p *Process) Stop() {
p.t.Logf("stopping %s %s", p.name, p.uid)
if p.Cmd == nil {
panic(fmt.Errorf("p: %s, cmd is empty", p.uid))
}
p.Cmd.Continue()
p.Cmd.Cmd.Process.Signal(os.Interrupt)
p.Cmd.Wait()
p.Cmd = nil
}
func (p *Process) Wait(timeout time.Duration) error {
timeoutCh := time.NewTimer(timeout).C
endCh := make(chan error)
go func() {
err := p.Cmd.Wait()
endCh <- err
}()
select {
case <-timeoutCh:
return fmt.Errorf("timeout waiting on process")
case <-endCh:
return nil
}
}
type TestEmbeddedEtcd struct {
t *testing.T
*TestEtcd
Etcd *embed.Etcd
Endpoint string
ListenAddress string
Port string
}
func NewTestEmbeddedEtcd(t *testing.T, logger *zap.Logger, dir string, a ...string) (*TestEmbeddedEtcd, error) {
u := uuid.NewV4()
uid := fmt.Sprintf("%x", u[:4])
dataDir := filepath.Join(dir, fmt.Sprintf("etcd%s", uid))
listenAddress, port, err := GetFreePort(true, false)
if err != nil {
return nil, err
}
listenAddress2, port2, err := GetFreePort(true, false)
if err != nil {
return nil, err
}
cfg := embed.NewConfig()
cfg.Name = uid
cfg.Dir = dataDir
cfg.Logger = "zap"
cfg.LogOutputs = []string{"stdout"}
lcurl, _ := url.Parse(fmt.Sprintf("http://%s:%s", listenAddress, port))
lpurl, _ := url.Parse(fmt.Sprintf("http://%s:%s", listenAddress2, port2))
cfg.LCUrls = []url.URL{*lcurl}
cfg.ACUrls = []url.URL{*lcurl}
cfg.LPUrls = []url.URL{*lpurl}
cfg.APUrls = []url.URL{*lpurl}
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
t.Logf("starting embedded etcd server")
embeddedEtcd, err := embed.StartEtcd(cfg)
if err != nil {
return nil, err
}
storeEndpoint := fmt.Sprintf("http://%s:%s", listenAddress, port)
storeConfig := etcd.Config{
Logger: logger,
Endpoints: storeEndpoint,
}
e, err := etcd.New(storeConfig)
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
tectd := &TestEmbeddedEtcd{
t: t,
TestEtcd: &TestEtcd{
e,
t,
},
Etcd: embeddedEtcd,
Endpoint: storeEndpoint,
ListenAddress: listenAddress,
Port: port,
}
return tectd, nil
}
func (te *TestEmbeddedEtcd) Start() error {
<-te.Etcd.Server.ReadyNotify()
return nil
}
func (te *TestEmbeddedEtcd) Stop() error {
te.Etcd.Close()
return nil
}
func (te *TestEmbeddedEtcd) Kill() error {
te.Etcd.Close()
return nil
}
type TestExternalEtcd struct {
t *testing.T
*TestEtcd
Process
Endpoint string
ListenAddress string
Port string
}
func NewTestExternalEtcd(t *testing.T, logger *zap.Logger, dir string, a ...string) (*TestExternalEtcd, error) {
u := uuid.NewV4()
uid := fmt.Sprintf("%x", u[:4])
dataDir := filepath.Join(dir, fmt.Sprintf("etcd%s", uid))
listenAddress, port, err := GetFreePort(true, false)
if err != nil {
return nil, err
}
listenAddress2, port2, err := GetFreePort(true, false)
if err != nil {
return nil, err
}
args := []string{}
args = append(args, fmt.Sprintf("--name=%s", uid))
args = append(args, fmt.Sprintf("--data-dir=%s", dataDir))
args = append(args, fmt.Sprintf("--listen-client-urls=http://%s:%s", listenAddress, port))
args = append(args, fmt.Sprintf("--advertise-client-urls=http://%s:%s", listenAddress, port))
args = append(args, fmt.Sprintf("--listen-peer-urls=http://%s:%s", listenAddress2, port2))
args = append(args, fmt.Sprintf("--initial-advertise-peer-urls=http://%s:%s", listenAddress2, port2))
args = append(args, fmt.Sprintf("--initial-cluster=%s=http://%s:%s", uid, listenAddress2, port2))
args = append(args, a...)
storeEndpoint := fmt.Sprintf("http://%s:%s", listenAddress, port)
storeConfig := etcd.Config{
Logger: logger,
Endpoints: storeEndpoint,
}
e, err := etcd.New(storeConfig)
if err != nil {
return nil, fmt.Errorf("cannot create store: %v", err)
}
bin := os.Getenv("ETCD_BIN")
if bin == "" {
return nil, fmt.Errorf("missing ETCD_BIN env")
}
tectd := &TestExternalEtcd{
t: t,
TestEtcd: &TestEtcd{
e,
t,
},
Process: Process{
t: t,
uid: uid,
name: "etcd",
bin: bin,
args: args,
},
Endpoint: storeEndpoint,
ListenAddress: listenAddress,
Port: port,
}
return tectd, nil
}
type TestEtcd struct {
*etcd.Store
t *testing.T
}
func (te *TestEtcd) Compact() error {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
resp, err := te.Get(ctx, "anykey")
if err != nil && err != etcd.ErrKeyNotFound {
return err
}
_, err = te.Client().Compact(ctx, resp.Header.Revision)
return err
}
func (te *TestEtcd) WaitUp(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
_, err := te.Get(ctx, "anykey")
if err != nil && err == etcd.ErrKeyNotFound {
return nil
}
if err == nil {
return nil
}
time.Sleep(sleepInterval)
}
return fmt.Errorf("timeout")
}
func (te *TestEtcd) WaitDown(timeout time.Duration) error {
start := time.Now()
for time.Now().Add(-timeout).Before(start) {
ctx, cancel := context.WithTimeout(context.Background(), etcdTimeout)
defer cancel()
_, err := te.Get(ctx, "anykey")
if err != nil && err != etcd.ErrKeyNotFound {
return nil
}
time.Sleep(sleepInterval)
}
return fmt.Errorf("timeout")
}
func testFreeTCPPort(port int) error {
ln, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", port))
if err != nil {
return err
}
ln.Close()
return nil
}
func testFreeUDPPort(port int) error {
ln, err := net.ListenPacket("udp", fmt.Sprintf("localhost:%d", port))
if err != nil {
return err
}
ln.Close()
return nil
}
// Hack to find a free tcp and udp port
func GetFreePort(tcp bool, udp bool) (string, string, error) {
portMutex.Lock()
defer portMutex.Unlock()
if !tcp && !udp {
return "", "", fmt.Errorf("at least one of tcp or udp port shuld be required")
}
localhostIP, err := net.ResolveIPAddr("ip", "localhost")
if err != nil {
return "", "", fmt.Errorf("failed to resolve ip addr: %v", err)
}
for {
curPort++
if curPort > MaxPort {
return "", "", fmt.Errorf("all available ports to test have been exausted")
}
if tcp {
if err := testFreeTCPPort(curPort); err != nil {
continue
}
}
if udp {
if err := testFreeUDPPort(curPort); err != nil {
continue
}
}
return localhostIP.IP.String(), strconv.Itoa(curPort), nil
}
}

374
internal/wal/changes.go Normal file
View File

@ -0,0 +1,374 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"context"
"encoding/json"
"fmt"
"io"
"path"
"sort"
"strings"
"sync"
"time"
"github.com/sorintlab/agola/internal/etcd"
"github.com/pkg/errors"
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"go.etcd.io/etcd/mvcc/mvccpb"
)
type WalChanges struct {
actions map[string][]*Action
puts map[string]string
deletes map[string]string
pathsOrdered []string
walSeq string
revision int64
changeGroupsRevisions changeGroupsRevisions
initialized bool
sync.Mutex
}
func NewWalChanges() *WalChanges {
return &WalChanges{
actions: make(map[string][]*Action),
puts: make(map[string]string),
deletes: make(map[string]string),
changeGroupsRevisions: make(changeGroupsRevisions),
}
}
func (c *WalChanges) String() string {
return fmt.Sprintf("puts: %s, deletes: %s, walSeq: %s, revision: %d, initialized: %t", c.puts, c.deletes, c.walSeq, c.revision, c.initialized)
}
func (c *WalChanges) curRevision() int64 {
return c.revision
}
func (c *WalChanges) putRevision(revision int64) {
c.revision = revision
}
func (c *WalChanges) curWalSeq() string {
return c.walSeq
}
func (c *WalChanges) getPut(p string) (string, bool) {
walseq, ok := c.puts[p]
return walseq, ok
}
func (c *WalChanges) getDeletesMap() map[string]struct{} {
dmap := map[string]struct{}{}
for p := range c.deletes {
dmap[p] = struct{}{}
}
return dmap
}
func (c *WalChanges) getDelete(p string) bool {
_, ok := c.deletes[p]
return ok
}
func (c *WalChanges) addPut(p, walseq string, revision int64) {
delete(c.deletes, p)
c.puts[p] = walseq
c.walSeq = walseq
c.revision = revision
}
func (c *WalChanges) removePut(p string, revision int64) {
delete(c.puts, p)
c.revision = revision
}
func (c *WalChanges) addDelete(p, walseq string, revision int64) {
delete(c.puts, p)
c.deletes[p] = walseq
c.walSeq = walseq
c.revision = revision
}
func (c *WalChanges) removeDelete(p string, revision int64) {
delete(c.deletes, p)
c.revision = revision
}
func (c *WalChanges) getChangeGroups(cgNames []string) changeGroupsRevisions {
cgr := map[string]int64{}
for _, cgName := range cgNames {
if rev, ok := c.changeGroupsRevisions[cgName]; ok {
cgr[cgName] = rev
} else {
// for non existing changegroups use a changegroup with revision = 0
cgr[cgName] = 0
}
}
return cgr
}
func (c *WalChanges) putChangeGroup(cgName string, cgRev int64) {
c.changeGroupsRevisions[cgName] = cgRev
}
func (c *WalChanges) removeChangeGroup(cgName string) {
delete(c.changeGroupsRevisions, cgName)
}
func (c *WalChanges) updatePathsOrdered() {
c.pathsOrdered = make([]string, len(c.puts))
i := 0
for p := range c.puts {
c.pathsOrdered[i] = p
i++
}
sort.Sort(sort.StringSlice(c.pathsOrdered))
}
func (w *WalManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error {
walDataFilePath := w.storageWalDataFile(walData.WalDataFileID)
walDataFile, err := w.lts.ReadObject(walDataFilePath)
if err != nil {
return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath)
}
defer walDataFile.Close()
dec := json.NewDecoder(walDataFile)
w.changes.Lock()
defer w.changes.Unlock()
for {
var action *Action
err := dec.Decode(&action)
if err == io.EOF {
// all done
break
}
if err != nil {
return errors.Wrapf(err, "failed to decode wal file")
}
w.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
additionalActions, err := w.additionalActionsFunc(action)
if err != nil {
return err
}
for _, action := range additionalActions {
w.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
}
}
w.changes.updatePathsOrdered()
return nil
}
func (w *WalManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) {
switch action.ActionType {
case ActionTypePut:
w.changes.addPut(action.Path, walSequence, revision)
case ActionTypeDelete:
w.changes.addDelete(action.Path, walSequence, revision)
}
if w.changes.actions[walSequence] == nil {
w.changes.actions[walSequence] = []*Action{}
}
w.changes.actions[walSequence] = append(w.changes.actions[walSequence], action)
}
func (w *WalManager) watcherLoop(ctx context.Context) error {
for {
initialized := w.changes.initialized
if !initialized {
if err := w.initializeChanges(ctx); err != nil {
w.log.Errorf("watcher err: %+v", err)
}
} else {
if err := w.watcher(ctx); err != nil {
w.log.Errorf("watcher err: %+v", err)
}
}
select {
case <-ctx.Done():
w.log.Infof("watcher exiting")
return nil
default:
}
time.Sleep(1 * time.Second)
}
}
func (w *WalManager) initializeChanges(ctx context.Context) error {
var revision int64
var continuation *etcd.ListPagedContinuation
for {
listResp, err := w.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation)
if err != nil {
return err
}
resp := listResp.Resp
continuation = listResp.Continuation
revision = resp.Header.Revision
for _, kv := range resp.Kvs {
var walData *WalData
if err := json.Unmarshal(kv.Value, &walData); err != nil {
return err
}
if err := w.applyWalChanges(ctx, walData, revision); err != nil {
return err
}
}
if !listResp.HasMore {
break
}
}
continuation = nil
// use the same revision
for {
listResp, err := w.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation)
if err != nil {
return err
}
resp := listResp.Resp
continuation = listResp.Continuation
for _, kv := range resp.Kvs {
w.changes.Lock()
changeGroup := path.Base(string(kv.Key))
w.changes.putChangeGroup(changeGroup, kv.ModRevision)
w.changes.Unlock()
}
if !listResp.HasMore {
break
}
}
w.changes.Lock()
w.changes.revision = revision
w.changes.initialized = true
w.changes.Unlock()
return nil
}
func (w *WalManager) watcher(ctx context.Context) error {
w.changes.Lock()
revision := w.changes.curRevision()
w.changes.Unlock()
wctx, cancel := context.WithCancel(ctx)
defer cancel()
wch := w.e.Watch(wctx, etcdWalBaseDir+"/", revision+1)
for wresp := range wch {
if wresp.Canceled {
err := wresp.Err()
if err == etcdclientv3rpc.ErrCompacted {
w.log.Errorf("required events already compacted, reinitializing watcher changes")
w.changes.Lock()
w.changes.initialized = false
w.changes.Unlock()
}
return errors.Wrapf(err, "watch error")
}
revision := wresp.Header.Revision
for _, ev := range wresp.Events {
key := string(ev.Kv.Key)
switch {
case strings.HasPrefix(key, etcdWalsDir+"/"):
switch ev.Type {
case mvccpb.PUT:
var walData *WalData
if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil {
return err
}
if walData.WalStatus != WalStatusCommitted {
continue
}
if err := w.applyWalChanges(ctx, walData, revision); err != nil {
return err
}
case mvccpb.DELETE:
walseq := path.Base(string(key))
w.changes.Lock()
putsToDelete := []string{}
deletesToDelete := []string{}
for p, pwalseq := range w.changes.puts {
if pwalseq == walseq {
putsToDelete = append(putsToDelete, p)
}
}
for p, pwalseq := range w.changes.deletes {
if pwalseq == walseq {
deletesToDelete = append(deletesToDelete, p)
}
}
for _, p := range putsToDelete {
w.changes.removePut(p, revision)
}
for _, p := range deletesToDelete {
w.changes.removeDelete(p, revision)
}
delete(w.changes.actions, walseq)
w.changes.updatePathsOrdered()
w.changes.Unlock()
}
case strings.HasPrefix(key, etcdChangeGroupsDir+"/"):
switch ev.Type {
case mvccpb.PUT:
w.changes.Lock()
changeGroup := path.Base(string(ev.Kv.Key))
w.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision)
w.changes.Unlock()
case mvccpb.DELETE:
w.changes.Lock()
changeGroup := path.Base(string(ev.Kv.Key))
w.changes.removeChangeGroup(changeGroup)
w.changes.Unlock()
}
case key == etcdPingKey:
w.changes.Lock()
w.changes.putRevision(wresp.Header.Revision)
w.changes.Unlock()
}
}
}
return nil
}

1280
internal/wal/wal.go Normal file

File diff suppressed because it is too large Load Diff

275
internal/wal/wal_test.go Normal file
View File

@ -0,0 +1,275 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package wal
import (
"context"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/testutil"
"github.com/google/go-cmp/cmp"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
var logger = slog.New(level)
var log = logger.Sugar()
func setupEtcd(t *testing.T, dir string) *testutil.TestEmbeddedEtcd {
tetcd, err := testutil.NewTestEmbeddedEtcd(t, logger, dir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := tetcd.WaitUp(30 * time.Second); err != nil {
t.Fatalf("error waiting on store up: %v", err)
}
return tetcd
}
func shutdownEtcd(tetcd *testutil.TestEmbeddedEtcd) {
if tetcd.Etcd != nil {
tetcd.Kill()
}
}
type noopCheckpointer struct {
}
func (c *noopCheckpointer) Checkpoint(ctx context.Context, action *Action) error {
return nil
}
func TestEtcdReset(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx, cancel := context.WithCancel(context.Background())
ltsDir, err := ioutil.TempDir(dir, "lts")
lts, err := objectstorage.NewPosixStorage(ltsDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
walConfig := &WalManagerConfig{
BasePath: "basepath",
E: tetcd.TestEtcd.Store,
Lts: objectstorage.NewObjStorage(lts, "/"),
EtcdWalsKeepNum: 10,
}
wal, err := NewWalManager(ctx, logger, walConfig)
go wal.Run(ctx)
time.Sleep(1 * time.Second)
actions := []*Action{
{
ActionType: ActionTypePut,
Data: []byte("{}"),
},
}
expectedObjects := []string{}
for i := 0; i < 20; i++ {
objectPath := fmt.Sprintf("object%02d", i)
expectedObjects = append(expectedObjects, objectPath)
actions[0].Path = objectPath
if _, err := wal.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// wait for wal to be committed storage
time.Sleep(5 * time.Second)
// Reset etcd
shutdownEtcd(tetcd)
tetcd.WaitDown(10 * time.Second)
os.RemoveAll(etcdDir)
if err := tetcd.Start(); err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer shutdownEtcd(tetcd)
cancel()
ctx = context.Background()
go wal.Run(ctx)
time.Sleep(5 * time.Second)
curObjects := []string{}
doneCh := make(chan struct{})
for object := range wal.List("", "", true, doneCh) {
t.Logf("path: %q", object.Path)
if object.Err != nil {
t.Fatalf("unexpected err: %v", object.Err)
}
curObjects = append(curObjects, object.Path)
}
close(doneCh)
t.Logf("curObjects: %s", curObjects)
if diff := cmp.Diff(expectedObjects, curObjects); diff != "" {
t.Error(diff)
}
}
func TestConcurrentUpdate(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ltsDir, err := ioutil.TempDir(dir, "lts")
lts, err := objectstorage.NewPosixStorage(ltsDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
walConfig := &WalManagerConfig{
E: tetcd.TestEtcd.Store,
Lts: objectstorage.NewObjStorage(lts, "/"),
EtcdWalsKeepNum: 10,
}
wal, err := NewWalManager(ctx, logger, walConfig)
actions := []*Action{
{
ActionType: ActionTypePut,
Path: "/object01",
Data: []byte("{}"),
},
}
go wal.Run(ctx)
time.Sleep(1 * time.Second)
cgNames := []string{"changegroup01", "changegroup02"}
cgt := wal.GetChangeGroupsUpdateToken(cgNames)
// populate with a wal
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
}
// this must work successfully
oldcgt := cgt
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
}
// this must fail since we are using the old cgt
_, err = wal.WriteWal(ctx, actions, oldcgt)
if err != ErrConcurrency {
t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
}
oldcgt = cgt
// this must work successfully
cgt, err = wal.WriteWal(ctx, actions, cgt)
if err != nil {
t.Fatalf("err: %v", err)
}
// this must fail since we are using the old cgt
_, err = wal.WriteWal(ctx, actions, oldcgt)
if err != ErrConcurrency {
t.Fatalf("expected err: %v, got %v", ErrConcurrency, err)
}
}
func TestWalCleaner(t *testing.T) {
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
etcdDir, err := ioutil.TempDir(dir, "etcd")
tetcd := setupEtcd(t, etcdDir)
defer shutdownEtcd(tetcd)
ctx := context.Background()
ltsDir, err := ioutil.TempDir(dir, "lts")
lts, err := objectstorage.NewPosixStorage(ltsDir)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
walKeepNum := 10
walConfig := &WalManagerConfig{
E: tetcd.TestEtcd.Store,
Lts: objectstorage.NewObjStorage(lts, "/"),
EtcdWalsKeepNum: walKeepNum,
}
wal, err := NewWalManager(ctx, logger, walConfig)
actions := []*Action{
{
ActionType: ActionTypePut,
Path: "/object01",
Data: []byte("{}"),
},
}
go wal.Run(ctx)
time.Sleep(1 * time.Second)
for i := 0; i < 20; i++ {
if _, err := wal.WriteWal(ctx, actions, nil); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
// wait for walCleaner to complete
time.Sleep(5 * time.Second)
walsCount := 0
for range wal.ListEtcdWals(ctx, 0) {
walsCount++
}
if walsCount != walKeepNum {
t.Fatalf("expected %d wals in etcd, got %d wals", walKeepNum, walsCount)
}
}