diff --git a/internal/services/configstore/action/action.go b/internal/services/configstore/action/action.go index 4779fc1..ae6bd59 100644 --- a/internal/services/configstore/action/action.go +++ b/internal/services/configstore/action/action.go @@ -16,21 +16,30 @@ package action import ( "agola.io/agola/internal/datamanager" + "agola.io/agola/internal/etcd" "agola.io/agola/internal/services/configstore/readdb" "go.uber.org/zap" ) type ActionHandler struct { - log *zap.SugaredLogger - readDB *readdb.ReadDB - dm *datamanager.DataManager + log *zap.SugaredLogger + readDB *readdb.ReadDB + dm *datamanager.DataManager + e *etcd.Store + maintenanceMode bool } -func NewActionHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager) *ActionHandler { +func NewActionHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager, e *etcd.Store) *ActionHandler { return &ActionHandler{ - log: logger.Sugar(), - readDB: readDB, - dm: dm, + log: logger.Sugar(), + readDB: readDB, + dm: dm, + e: e, + maintenanceMode: false, } } + +func (h *ActionHandler) SetMaintenanceMode(maintenanceMode bool) { + h.maintenanceMode = maintenanceMode +} diff --git a/internal/services/configstore/action/maintenance.go b/internal/services/configstore/action/maintenance.go new file mode 100644 index 0000000..780243c --- /dev/null +++ b/internal/services/configstore/action/maintenance.go @@ -0,0 +1,73 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package action + +import ( + "context" + "io" + + "agola.io/agola/internal/etcd" + "agola.io/agola/internal/services/configstore/common" + "agola.io/agola/internal/util" + + errors "golang.org/x/xerrors" +) + +func (h *ActionHandler) MaintenanceMode(ctx context.Context, enable bool) error { + resp, err := h.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + if enable && len(resp.Kvs) > 0 { + return util.NewErrBadRequest(errors.Errorf("maintenance mode already enabled")) + } + if !enable && len(resp.Kvs) == 0 { + return util.NewErrBadRequest(errors.Errorf("maintenance mode already disabled")) + } + + if enable { + txResp, err := h.e.AtomicPut(ctx, common.EtcdMaintenanceKey, []byte{}, 0, nil) + if err != nil { + return err + } + if !txResp.Succeeded { + return errors.Errorf("failed to create maintenance mode key due to concurrent update") + } + } + + if !enable { + txResp, err := h.e.AtomicDelete(ctx, common.EtcdMaintenanceKey, resp.Kvs[0].ModRevision) + if err != nil { + return err + } + if !txResp.Succeeded { + return errors.Errorf("failed to delete maintenance mode key due to concurrent update") + } + } + + return nil +} + +func (h *ActionHandler) Export(ctx context.Context, w io.Writer) error { + return h.dm.Export(ctx, w) +} + +func (h *ActionHandler) Import(ctx context.Context, r io.Reader) error { + if !h.maintenanceMode { + return util.NewErrBadRequest(errors.Errorf("not in maintenance mode")) + } + return h.dm.Import(ctx, r) +} diff --git a/internal/services/configstore/api/maintenance.go b/internal/services/configstore/api/maintenance.go new file mode 100644 index 0000000..3b2e467 --- /dev/null +++ b/internal/services/configstore/api/maintenance.go @@ -0,0 +1,107 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "agola.io/agola/internal/etcd" + "agola.io/agola/internal/services/configstore/action" + + "go.uber.org/zap" +) + +type MaintenanceModeHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler + e *etcd.Store +} + +func NewMaintenanceModeHandler(logger *zap.Logger, ah *action.ActionHandler, e *etcd.Store) *MaintenanceModeHandler { + return &MaintenanceModeHandler{log: logger.Sugar(), ah: ah, e: e} +} + +func (h *MaintenanceModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + enable := false + switch r.Method { + case "PUT": + enable = true + case "DELETE": + enable = false + } + + err := h.ah.MaintenanceMode(ctx, enable) + if err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + + if err := httpResponse(w, http.StatusOK, nil); err != nil { + h.log.Errorf("err: %+v", err) + } + +} + +type ExportHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler +} + +func NewExportHandler(logger *zap.Logger, ah *action.ActionHandler) *ExportHandler { + return &ExportHandler{log: logger.Sugar(), ah: ah} +} + +func (h *ExportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + err := h.ah.Export(ctx, w) + if err != nil { + h.log.Errorf("err: %+v", err) + // since we already answered with a 200 we cannot return another error code + // So abort the connection and the client will detect the missing ending chunk + // and consider this an error + // + // this is the way to force close a request without logging the panic + panic(http.ErrAbortHandler) + } +} + +type ImportHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler +} + +func NewImportHandler(logger *zap.Logger, ah *action.ActionHandler) *ImportHandler { + return &ImportHandler{log: logger.Sugar(), ah: ah} +} + +func (h *ImportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + err := h.ah.Import(ctx, r.Body) + if err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + + if err := httpResponse(w, http.StatusOK, nil); err != nil { + h.log.Errorf("err: %+v", err) + } + +} diff --git a/internal/services/configstore/common/common.go b/internal/services/configstore/common/common.go index e9d8e85..192417d 100644 --- a/internal/services/configstore/common/common.go +++ b/internal/services/configstore/common/common.go @@ -21,6 +21,10 @@ import ( uuid "github.com/satori/go.uuid" ) +const ( + EtcdMaintenanceKey = "maintenance" +) + type RefType int const ( diff --git a/internal/services/configstore/configstore.go b/internal/services/configstore/configstore.go index b7f2834..780b359 100644 --- a/internal/services/configstore/configstore.go +++ b/internal/services/configstore/configstore.go @@ -19,6 +19,8 @@ import ( "crypto/tls" "net/http" "path/filepath" + "sync" + "time" scommon "agola.io/agola/internal/common" "agola.io/agola/internal/datamanager" @@ -28,11 +30,14 @@ import ( "agola.io/agola/internal/services/config" action "agola.io/agola/internal/services/configstore/action" "agola.io/agola/internal/services/configstore/api" + "agola.io/agola/internal/services/configstore/common" "agola.io/agola/internal/services/configstore/readdb" "agola.io/agola/internal/services/types" "agola.io/agola/internal/util" "github.com/gorilla/mux" + etcdclientv3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" "go.uber.org/zap/zapcore" ) @@ -41,13 +46,78 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) var logger = slog.New(level) var log = logger.Sugar() +func (s *Configstore) maintenanceModeWatcherLoop(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) { + for { + log.Debugf("maintenanceModeWatcherLoop") + + // at first watch restart from previous processed revision + if err := s.maintenanceModeWatcher(ctx, runCtxCancel, maintenanceModeEnabled); err != nil { + log.Errorf("err: %+v", err) + } + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (s *Configstore) maintenanceModeWatcher(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) error { + log.Infof("watcher: maintenance mode enabled: %t", maintenanceModeEnabled) + resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + if len(resp.Kvs) > 0 { + log.Infof("maintenance mode key is present") + if !maintenanceModeEnabled { + runCtxCancel() + } + } + + revision := resp.Header.Revision + + wctx := etcdclientv3.WithRequireLeader(ctx) + + // restart from previous processed revision + wch := s.e.Watch(wctx, common.EtcdMaintenanceKey, revision) + + for wresp := range wch { + if wresp.Canceled { + return wresp.Err() + } + + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + log.Infof("maintenance mode key set") + if !maintenanceModeEnabled { + runCtxCancel() + } + + case mvccpb.DELETE: + log.Infof("maintenance mode key removed") + if maintenanceModeEnabled { + runCtxCancel() + } + } + } + } + + return nil +} + type Configstore struct { - c *config.Configstore - e *etcd.Store - dm *datamanager.DataManager - readDB *readdb.ReadDB - ost *objectstorage.ObjStorage - ah *action.ActionHandler + c *config.Configstore + e *etcd.Store + dm *datamanager.DataManager + readDB *readdb.ReadDB + ost *objectstorage.ObjStorage + ah *action.ActionHandler + maintenanceMode bool } func NewConfigstore(ctx context.Context, c *config.Configstore) (*Configstore, error) { @@ -97,22 +167,15 @@ func NewConfigstore(ctx context.Context, c *config.Configstore) (*Configstore, e cs.dm = dm cs.readDB = readDB - ah := action.NewActionHandler(logger, readDB, dm) + ah := action.NewActionHandler(logger, readDB, dm, e) cs.ah = ah return cs, nil } -func (s *Configstore) Run(ctx context.Context) error { - errCh := make(chan error) - dmReadyCh := make(chan struct{}) - - go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - - // wait for dm to be ready - <-dmReadyCh - - go func() { errCh <- s.readDB.Run(ctx) }() +func (s *Configstore) setupDefaultRouter() http.Handler { + maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e) + exportHandler := api.NewExportHandler(logger, s.ah) projectGroupHandler := api.NewProjectGroupHandler(logger, s.readDB) projectGroupSubgroupsHandler := api.NewProjectGroupSubgroupsHandler(logger, s.ah, s.readDB) @@ -227,9 +290,52 @@ func (s *Configstore) Run(ctx context.Context) error { apirouter.Handle("/remotesources/{remotesourceref}", updateRemoteSourceHandler).Methods("PUT") apirouter.Handle("/remotesources/{remotesourceref}", deleteRemoteSourceHandler).Methods("DELETE") + apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE") + + apirouter.Handle("/export", exportHandler).Methods("GET") + mainrouter := mux.NewRouter() mainrouter.PathPrefix("/").Handler(router) + return mainrouter +} + +func (s *Configstore) setupMaintenanceRouter() http.Handler { + maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e) + exportHandler := api.NewExportHandler(logger, s.ah) + importHandler := api.NewImportHandler(logger, s.ah) + + router := mux.NewRouter() + apirouter := router.PathPrefix("/api/v1alpha").Subrouter().UseEncodedPath() + + apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE") + + apirouter.Handle("/export", exportHandler).Methods("GET") + apirouter.Handle("/import", importHandler).Methods("POST") + + mainrouter := mux.NewRouter() + mainrouter.PathPrefix("/").Handler(router) + + return mainrouter +} + +func (s *Configstore) Run(ctx context.Context) error { + for { + if err := s.run(ctx); err != nil { + log.Errorf("run error: %+v", err) + } + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + log.Infof("configstore exiting") + return nil + case <-sleepCh: + } + } +} + +func (s *Configstore) run(ctx context.Context) error { var tlsConfig *tls.Config if s.c.Web.TLS { var err error @@ -240,21 +346,60 @@ func (s *Configstore) Run(ctx context.Context) error { } } + resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + maintenanceMode := false + if len(resp.Kvs) > 0 { + log.Infof("maintenance mode key is present") + maintenanceMode = true + } + + s.maintenanceMode = maintenanceMode + s.dm.SetMaintenanceMode(maintenanceMode) + s.ah.SetMaintenanceMode(maintenanceMode) + + ctx, cancel := context.WithCancel(ctx) + errCh := make(chan error, 100) + var wg sync.WaitGroup + dmReadyCh := make(chan struct{}) + + var mainrouter http.Handler + if s.maintenanceMode { + mainrouter = s.setupMaintenanceRouter() + util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) }) + + } else { + mainrouter = s.setupDefaultRouter() + + util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) }) + + // TODO(sgotti) wait for all goroutines exiting + util.GoWait(&wg, func() { errCh <- s.dm.Run(ctx, dmReadyCh) }) + + // wait for dm to be ready + <-dmReadyCh + + util.GoWait(&wg, func() { errCh <- s.readDB.Run(ctx) }) + } + httpServer := http.Server{ Addr: s.c.Web.ListenAddress, Handler: mainrouter, TLSConfig: tlsConfig, } - lerrCh := make(chan error) - go func() { + lerrCh := make(chan error, 1) + util.GoWait(&wg, func() { lerrCh <- httpServer.ListenAndServe() - }() + }) + defer httpServer.Close() select { case <-ctx.Done(): - log.Infof("configstore exiting") - httpServer.Close() + log.Infof("configstore run exiting") case err := <-lerrCh: if err != nil { log.Errorf("http server listen error: %+v", err) @@ -267,5 +412,9 @@ func (s *Configstore) Run(ctx context.Context) error { } } - return nil + cancel() + httpServer.Close() + wg.Wait() + + return err } diff --git a/internal/services/configstore/configstore_test.go b/internal/services/configstore/configstore_test.go index 6a867d6..16cd3da 100644 --- a/internal/services/configstore/configstore_test.go +++ b/internal/services/configstore/configstore_test.go @@ -15,6 +15,7 @@ package configstore import ( + "bytes" "context" "fmt" "io/ioutil" @@ -216,7 +217,7 @@ func TestResync(t *testing.T) { cancel2() // Do some more changes - for i := 11; i < 20; i++ { + for i := 10; i < 20; i++ { if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil { t.Fatalf("err: %v", err) } @@ -246,6 +247,10 @@ func TestResync(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + if len(users1) != 20 { + t.Logf("users1: %s", util.Dump(users1)) + t.Fatalf("expected %d users, got %d users", 20, len(users1)) + } users2, err := getUsers(ctx, cs2) if err != nil { @@ -275,10 +280,238 @@ func TestResync(t *testing.T) { time.Sleep(5 * time.Second) + users3, err := getUsers(ctx, cs3) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !compareUsers(users1, users3) { + t.Logf("len(users1): %d", len(users1)) + t.Logf("len(users3): %d", len(users3)) + t.Logf("users1: %s", util.Dump(users1)) + t.Logf("users3: %s", util.Dump(users3)) + t.Fatalf("users are different between the two readdbs") + } +} + +func TestExportImport(t *testing.T) { + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + etcdDir, err := ioutil.TempDir(dir, "etcd") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + tetcd := setupEtcd(t, etcdDir) + defer shutdownEtcd(tetcd) + + listenAddress1, port1, err := testutil.GetFreePort(true, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + listenAddress2, port2, err := testutil.GetFreePort(true, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + listenAddress3, port3, err := testutil.GetFreePort(true, false) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ctx := context.Background() + + ostDir, err := ioutil.TempDir(dir, "ost") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + csDir1, err := ioutil.TempDir(dir, "cs1") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + csDir2, err := ioutil.TempDir(dir, "cs2") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + csDir3, err := ioutil.TempDir(dir, "cs3") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + baseConfig := config.Configstore{ + Etcd: config.Etcd{ + Endpoints: tetcd.Endpoint, + }, + ObjectStorage: config.ObjectStorage{ + Type: config.ObjectStorageTypePosix, + Path: ostDir, + }, + Web: config.Web{}, + } + cs1Config := baseConfig + cs1Config.DataDir = csDir1 + cs1Config.Web.ListenAddress = net.JoinHostPort(listenAddress1, port1) + + cs2Config := baseConfig + cs2Config.DataDir = csDir2 + cs2Config.Web.ListenAddress = net.JoinHostPort(listenAddress2, port2) + + cs3Config := baseConfig + cs3Config.DataDir = csDir3 + cs3Config.Web.ListenAddress = net.JoinHostPort(listenAddress3, port3) + + cs1, err := NewConfigstore(ctx, &cs1Config) + if err != nil { + t.Fatalf("err: %v", err) + } + cs2, err := NewConfigstore(ctx, &cs2Config) + if err != nil { + t.Fatalf("err: %v", err) + } + cs3, err := NewConfigstore(ctx, &cs3Config) + if err != nil { + t.Fatalf("err: %v", err) + } + + ctx1 := context.Background() + ctx2, cancel2 := context.WithCancel(context.Background()) + ctx3, cancel3 := context.WithCancel(context.Background()) + + t.Logf("starting cs1") + go func() { _ = cs1.Run(ctx1) }() + t.Logf("starting cs2") + go func() { _ = cs2.Run(ctx2) }() + t.Logf("starting cs3") + go func() { _ = cs3.Run(ctx3) }() + + time.Sleep(1 * time.Second) + + for i := 0; i < 10; i++ { + if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(200 * time.Millisecond) + } + + time.Sleep(5 * time.Second) + + // stop cs2 + log.Infof("stopping cs2") + cancel2() + // stop cs3 + log.Infof("stopping cs3") + cancel3() + + // Do some more changes + for i := 10; i < 20; i++ { + if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(200 * time.Millisecond) + } + + time.Sleep(5 * time.Second) + + users1, err := getUsers(ctx, cs1) + if err != nil { + t.Fatalf("err: %v", err) + } + if len(users1) != 20 { + t.Logf("users1: %s", util.Dump(users1)) + t.Fatalf("expected %d users, got %d users", 20, len(users1)) + } + + var export bytes.Buffer + if err := cs1.ah.Export(ctx, &export); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := cs1.ah.MaintenanceMode(ctx, true); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + time.Sleep(5 * time.Second) + + if err := cs1.ah.Import(ctx, &export); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + if err := cs1.ah.MaintenanceMode(ctx, false); err != nil { + t.Fatalf("unexpected err: %v", err) + } + + time.Sleep(5 * time.Second) + + newUsers1, err := getUsers(ctx, cs1) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !compareUsers(users1, newUsers1) { + t.Logf("len(users1): %d", len(users1)) + t.Logf("len(newUsers1): %d", len(newUsers1)) + t.Logf("users1: %s", util.Dump(users1)) + t.Logf("newUsers1: %s", util.Dump(newUsers1)) + t.Fatalf("users are different between the two readdbs") + } + + // start cs2 + // it should do a full resync since we have imported new data and there's now wal in etcd + cs2, err = NewConfigstore(ctx, &cs2Config) + if err != nil { + t.Fatalf("err: %v", err) + } + log.Infof("starting cs2") + ctx2 = context.Background() + go func() { _ = cs2.Run(ctx2) }() + + time.Sleep(5 * time.Second) + + users2, err := getUsers(ctx, cs2) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !compareUsers(users1, users2) { + t.Logf("len(users1): %d", len(users1)) + t.Logf("len(users2): %d", len(users2)) + t.Logf("users1: %s", util.Dump(users1)) + t.Logf("users2: %s", util.Dump(users2)) + t.Fatalf("users are different between the two readdbs") + } + + // Do some more changes + for i := 20; i < 30; i++ { + if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(200 * time.Millisecond) + } + + time.Sleep(5 * time.Second) + users1, err = getUsers(ctx, cs1) if err != nil { t.Fatalf("err: %v", err) } + if len(users1) != 30 { + t.Logf("users1: %s", util.Dump(users1)) + t.Fatalf("expected %d users, got %d users", 30, len(users1)) + } + + // start cs3 + // it should do a full resync since we have imported new data and there're some wals with a different epoch + cs3, err = NewConfigstore(ctx, &cs3Config) + if err != nil { + t.Fatalf("err: %v", err) + } + log.Infof("starting cs3") + ctx3 = context.Background() + go func() { _ = cs3.Run(ctx3) }() + + time.Sleep(5 * time.Second) users3, err := getUsers(ctx, cs3) if err != nil {