diff --git a/internal/services/runservice/action/action.go b/internal/services/runservice/action/action.go index afba39b..407ce57 100644 --- a/internal/services/runservice/action/action.go +++ b/internal/services/runservice/action/action.go @@ -37,23 +37,29 @@ import ( ) type ActionHandler struct { - log *zap.SugaredLogger - e *etcd.Store - readDB *readdb.ReadDB - ost *objectstorage.ObjStorage - dm *datamanager.DataManager + log *zap.SugaredLogger + e *etcd.Store + readDB *readdb.ReadDB + ost *objectstorage.ObjStorage + dm *datamanager.DataManager + maintenanceMode bool } func NewActionHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *ActionHandler { return &ActionHandler{ - log: logger.Sugar(), - e: e, - readDB: readDB, - ost: ost, - dm: dm, + log: logger.Sugar(), + e: e, + readDB: readDB, + ost: ost, + dm: dm, + maintenanceMode: false, } } +func (h *ActionHandler) SetMaintenanceMode(maintenanceMode bool) { + h.maintenanceMode = maintenanceMode +} + type RunChangePhaseRequest struct { RunID string Phase types.RunPhase diff --git a/internal/services/runservice/action/maintenance.go b/internal/services/runservice/action/maintenance.go new file mode 100644 index 0000000..677ac32 --- /dev/null +++ b/internal/services/runservice/action/maintenance.go @@ -0,0 +1,73 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package action + +import ( + "context" + "io" + + "agola.io/agola/internal/etcd" + "agola.io/agola/internal/services/runservice/common" + "agola.io/agola/internal/util" + + errors "golang.org/x/xerrors" +) + +func (h *ActionHandler) MaintenanceMode(ctx context.Context, enable bool) error { + resp, err := h.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + if enable && len(resp.Kvs) > 0 { + return util.NewErrBadRequest(errors.Errorf("maintenance mode already enabled")) + } + if !enable && len(resp.Kvs) == 0 { + return util.NewErrBadRequest(errors.Errorf("maintenance mode already disabled")) + } + + if enable { + txResp, err := h.e.AtomicPut(ctx, common.EtcdMaintenanceKey, []byte{}, 0, nil) + if err != nil { + return err + } + if !txResp.Succeeded { + return errors.Errorf("failed to create maintenance mode key due to concurrent update") + } + } + + if !enable { + txResp, err := h.e.AtomicDelete(ctx, common.EtcdMaintenanceKey, resp.Kvs[0].ModRevision) + if err != nil { + return err + } + if !txResp.Succeeded { + return errors.Errorf("failed to delete maintenance mode key due to concurrent update") + } + } + + return nil +} + +func (h *ActionHandler) Export(ctx context.Context, w io.Writer) error { + return h.dm.Export(ctx, w) +} + +func (h *ActionHandler) Import(ctx context.Context, r io.Reader) error { + if !h.maintenanceMode { + return util.NewErrBadRequest(errors.Errorf("not in maintenance mode")) + } + return h.dm.Import(ctx, r) +} diff --git a/internal/services/runservice/api/maintenance.go b/internal/services/runservice/api/maintenance.go new file mode 100644 index 0000000..711bf7c --- /dev/null +++ b/internal/services/runservice/api/maintenance.go @@ -0,0 +1,107 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "agola.io/agola/internal/etcd" + "agola.io/agola/internal/services/runservice/action" + + "go.uber.org/zap" +) + +type MaintenanceModeHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler + e *etcd.Store +} + +func NewMaintenanceModeHandler(logger *zap.Logger, ah *action.ActionHandler, e *etcd.Store) *MaintenanceModeHandler { + return &MaintenanceModeHandler{log: logger.Sugar(), ah: ah, e: e} +} + +func (h *MaintenanceModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + enable := false + switch r.Method { + case "PUT": + enable = true + case "DELETE": + enable = false + } + + err := h.ah.MaintenanceMode(ctx, enable) + if err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + + if err := httpResponse(w, http.StatusOK, nil); err != nil { + h.log.Errorf("err: %+v", err) + } + +} + +type ExportHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler +} + +func NewExportHandler(logger *zap.Logger, ah *action.ActionHandler) *ExportHandler { + return &ExportHandler{log: logger.Sugar(), ah: ah} +} + +func (h *ExportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + err := h.ah.Export(ctx, w) + if err != nil { + h.log.Errorf("err: %+v", err) + // since we already answered with a 200 we cannot return another error code + // So abort the connection and the client will detect the missing ending chunk + // and consider this an error + // + // this is the way to force close a request without logging the panic + panic(http.ErrAbortHandler) + } +} + +type ImportHandler struct { + log *zap.SugaredLogger + ah *action.ActionHandler +} + +func NewImportHandler(logger *zap.Logger, ah *action.ActionHandler) *ImportHandler { + return &ImportHandler{log: logger.Sugar(), ah: ah} +} + +func (h *ImportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + err := h.ah.Import(ctx, r.Body) + if err != nil { + h.log.Errorf("err: %+v", err) + httpError(w, err) + return + } + + if err := httpResponse(w, http.StatusOK, nil); err != nil { + h.log.Errorf("err: %+v", err) + } + +} diff --git a/internal/services/runservice/common/common.go b/internal/services/runservice/common/common.go index 3acbe80..de966bc 100644 --- a/internal/services/runservice/common/common.go +++ b/internal/services/runservice/common/common.go @@ -53,6 +53,8 @@ var ( EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock") EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner") EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater") + + EtcdMaintenanceKey = "maintenance" ) func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) } diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go index cb4c28d..58ad414 100644 --- a/internal/services/runservice/runservice.go +++ b/internal/services/runservice/runservice.go @@ -19,6 +19,7 @@ import ( "crypto/tls" "net/http" "path/filepath" + "sync" "time" scommon "agola.io/agola/internal/common" @@ -35,6 +36,7 @@ import ( "github.com/gorilla/mux" etcdclientv3 "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap/zapcore" ) @@ -65,13 +67,78 @@ func (s *Runservice) etcdPinger(ctx context.Context) error { return nil } +func (s *Runservice) maintenanceModeWatcherLoop(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) { + for { + log.Debugf("maintenanceModeWatcherLoop") + + // at first watch restart from previous processed revision + if err := s.maintenanceModeWatcher(ctx, runCtxCancel, maintenanceModeEnabled); err != nil { + log.Errorf("err: %+v", err) + } + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return + case <-sleepCh: + } + } +} + +func (s *Runservice) maintenanceModeWatcher(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) error { + log.Infof("watcher: maintenance mode enabled: %t", maintenanceModeEnabled) + resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + if len(resp.Kvs) > 0 { + log.Infof("maintenance mode key is present") + if !maintenanceModeEnabled { + runCtxCancel() + } + } + + revision := resp.Header.Revision + + wctx := etcdclientv3.WithRequireLeader(ctx) + + // restart from previous processed revision + wch := s.e.Watch(wctx, common.EtcdMaintenanceKey, revision) + + for wresp := range wch { + if wresp.Canceled { + return wresp.Err() + } + + for _, ev := range wresp.Events { + switch ev.Type { + case mvccpb.PUT: + log.Infof("maintenance mode key set") + if !maintenanceModeEnabled { + runCtxCancel() + } + + case mvccpb.DELETE: + log.Infof("maintenance mode key removed") + if maintenanceModeEnabled { + runCtxCancel() + } + } + } + } + + return nil +} + type Runservice struct { - c *config.Runservice - e *etcd.Store - ost *objectstorage.ObjStorage - dm *datamanager.DataManager - readDB *readdb.ReadDB - ah *action.ActionHandler + c *config.Runservice + e *etcd.Store + ost *objectstorage.ObjStorage + dm *datamanager.DataManager + readDB *readdb.ReadDB + ah *action.ActionHandler + maintenanceMode bool } func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) { @@ -137,31 +204,13 @@ func (s *Runservice) InitEtcd(ctx context.Context) error { return nil } -func (s *Runservice) Run(ctx context.Context) error { - errCh := make(chan error) - dmReadyCh := make(chan struct{}) - - go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - - // wait for dm to be ready - <-dmReadyCh - - for { - err := s.InitEtcd(ctx) - if err == nil { - break - } - log.Errorf("failed to initialize etcd: %+v", err) - time.Sleep(1 * time.Second) - } - - go func() { errCh <- s.readDB.Run(ctx) }() - - ch := make(chan *types.ExecutorTask) +func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Handler { + maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e) + exportHandler := api.NewExportHandler(logger, s.ah) // executor dedicated api, only calls from executor should happen on these handlers executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) - executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) + executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh) executorTaskHandler := api.NewExecutorTaskHandler(s.e) executorTasksHandler := api.NewExecutorTasksHandler(s.e) archivesHandler := api.NewArchivesHandler(logger, s.ost) @@ -209,23 +258,55 @@ func (s *Runservice) Run(ctx context.Context) error { apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET") + apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE") + + apirouter.Handle("/export", exportHandler).Methods("GET") + mainrouter := mux.NewRouter() mainrouter.PathPrefix("/").Handler(router) // Return a bad request when it doesn't match any route mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) - go s.executorTasksCleanerLoop(ctx) - go s.runsSchedulerLoop(ctx) - go s.runTasksUpdaterLoop(ctx) - go s.fetcherLoop(ctx) - go s.finishedRunsArchiverLoop(ctx) - go s.compactChangeGroupsLoop(ctx) - go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) - go s.executorTaskUpdateHandler(ctx, ch) + return mainrouter +} - go s.etcdPingerLoop(ctx) +func (s *Runservice) setupMaintenanceRouter() http.Handler { + maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e) + exportHandler := api.NewExportHandler(logger, s.ah) + importHandler := api.NewImportHandler(logger, s.ah) + router := mux.NewRouter() + apirouter := router.PathPrefix("/api/v1alpha").Subrouter().UseEncodedPath() + + apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE") + + apirouter.Handle("/export", exportHandler).Methods("GET") + apirouter.Handle("/import", importHandler).Methods("POST") + + mainrouter := mux.NewRouter() + mainrouter.PathPrefix("/").Handler(router) + + return mainrouter +} + +func (s *Runservice) Run(ctx context.Context) error { + for { + if err := s.run(ctx); err != nil { + log.Errorf("run error: %+v", err) + } + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + log.Infof("runservice exiting") + return nil + case <-sleepCh: + } + } +} + +func (s *Runservice) run(ctx context.Context) error { var tlsConfig *tls.Config if s.c.Web.TLS { var err error @@ -236,32 +317,98 @@ func (s *Runservice) Run(ctx context.Context) error { } } + resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0) + if err != nil && err != etcd.ErrKeyNotFound { + return err + } + + maintenanceMode := false + if len(resp.Kvs) > 0 { + log.Infof("maintenance mode key is present") + maintenanceMode = true + } + + s.maintenanceMode = maintenanceMode + s.dm.SetMaintenanceMode(maintenanceMode) + s.ah.SetMaintenanceMode(maintenanceMode) + + ctx, cancel := context.WithCancel(ctx) + errCh := make(chan error, 100) + var wg sync.WaitGroup + dmReadyCh := make(chan struct{}) + + var mainrouter http.Handler + if s.maintenanceMode { + mainrouter = s.setupMaintenanceRouter() + util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) }) + + } else { + ch := make(chan *types.ExecutorTask) + mainrouter = s.setupDefaultRouter(ch) + + util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) }) + + // TODO(sgotti) wait for all goroutines exiting + util.GoWait(&wg, func() { errCh <- s.dm.Run(ctx, dmReadyCh) }) + + // wait for dm to be ready + <-dmReadyCh + + for { + err := s.InitEtcd(ctx) + if err == nil { + break + } + log.Errorf("failed to initialize etcd: %+v", err) + + sleepCh := time.NewTimer(1 * time.Second).C + select { + case <-ctx.Done(): + return nil + case <-sleepCh: + } + } + + util.GoWait(&wg, func() { errCh <- s.readDB.Run(ctx) }) + + util.GoWait(&wg, func() { s.executorTasksCleanerLoop(ctx) }) + util.GoWait(&wg, func() { s.runsSchedulerLoop(ctx) }) + util.GoWait(&wg, func() { s.runTasksUpdaterLoop(ctx) }) + util.GoWait(&wg, func() { s.fetcherLoop(ctx) }) + util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) }) + util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) }) + util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) }) + util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) }) + util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) }) + } + httpServer := http.Server{ Addr: s.c.Web.ListenAddress, Handler: mainrouter, TLSConfig: tlsConfig, } - lerrCh := make(chan error) - go func() { + lerrCh := make(chan error, 1) + util.GoWait(&wg, func() { lerrCh <- httpServer.ListenAndServe() - }() + }) select { case <-ctx.Done(): - log.Infof("runservice scheduler exiting") - httpServer.Close() - case err := <-lerrCh: + log.Infof("runservice run exiting") + case err = <-lerrCh: if err != nil { log.Errorf("http server listen error: %v", err) - return err } case err := <-errCh: if err != nil { log.Errorf("error: %+v", err) - return err } } - return nil + cancel() + httpServer.Close() + wg.Wait() + + return err }