Merge pull request #68 from sgotti/runservice_maintenance_export_import
runservice: maintenance/export/import
This commit is contained in:
commit
504cd3d6dc
|
@ -37,23 +37,29 @@ import (
|
|||
)
|
||||
|
||||
type ActionHandler struct {
|
||||
log *zap.SugaredLogger
|
||||
e *etcd.Store
|
||||
readDB *readdb.ReadDB
|
||||
ost *objectstorage.ObjStorage
|
||||
dm *datamanager.DataManager
|
||||
log *zap.SugaredLogger
|
||||
e *etcd.Store
|
||||
readDB *readdb.ReadDB
|
||||
ost *objectstorage.ObjStorage
|
||||
dm *datamanager.DataManager
|
||||
maintenanceMode bool
|
||||
}
|
||||
|
||||
func NewActionHandler(logger *zap.Logger, e *etcd.Store, readDB *readdb.ReadDB, ost *objectstorage.ObjStorage, dm *datamanager.DataManager) *ActionHandler {
|
||||
return &ActionHandler{
|
||||
log: logger.Sugar(),
|
||||
e: e,
|
||||
readDB: readDB,
|
||||
ost: ost,
|
||||
dm: dm,
|
||||
log: logger.Sugar(),
|
||||
e: e,
|
||||
readDB: readDB,
|
||||
ost: ost,
|
||||
dm: dm,
|
||||
maintenanceMode: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *ActionHandler) SetMaintenanceMode(maintenanceMode bool) {
|
||||
h.maintenanceMode = maintenanceMode
|
||||
}
|
||||
|
||||
type RunChangePhaseRequest struct {
|
||||
RunID string
|
||||
Phase types.RunPhase
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
// Copyright 2019 Sorint.lab
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package action
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"agola.io/agola/internal/etcd"
|
||||
"agola.io/agola/internal/services/runservice/common"
|
||||
"agola.io/agola/internal/util"
|
||||
|
||||
errors "golang.org/x/xerrors"
|
||||
)
|
||||
|
||||
func (h *ActionHandler) MaintenanceMode(ctx context.Context, enable bool) error {
|
||||
resp, err := h.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||
if err != nil && err != etcd.ErrKeyNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if enable && len(resp.Kvs) > 0 {
|
||||
return util.NewErrBadRequest(errors.Errorf("maintenance mode already enabled"))
|
||||
}
|
||||
if !enable && len(resp.Kvs) == 0 {
|
||||
return util.NewErrBadRequest(errors.Errorf("maintenance mode already disabled"))
|
||||
}
|
||||
|
||||
if enable {
|
||||
txResp, err := h.e.AtomicPut(ctx, common.EtcdMaintenanceKey, []byte{}, 0, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !txResp.Succeeded {
|
||||
return errors.Errorf("failed to create maintenance mode key due to concurrent update")
|
||||
}
|
||||
}
|
||||
|
||||
if !enable {
|
||||
txResp, err := h.e.AtomicDelete(ctx, common.EtcdMaintenanceKey, resp.Kvs[0].ModRevision)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !txResp.Succeeded {
|
||||
return errors.Errorf("failed to delete maintenance mode key due to concurrent update")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *ActionHandler) Export(ctx context.Context, w io.Writer) error {
|
||||
return h.dm.Export(ctx, w)
|
||||
}
|
||||
|
||||
func (h *ActionHandler) Import(ctx context.Context, r io.Reader) error {
|
||||
if !h.maintenanceMode {
|
||||
return util.NewErrBadRequest(errors.Errorf("not in maintenance mode"))
|
||||
}
|
||||
return h.dm.Import(ctx, r)
|
||||
}
|
|
@ -0,0 +1,107 @@
|
|||
// Copyright 2019 Sorint.lab
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
|
||||
"agola.io/agola/internal/etcd"
|
||||
"agola.io/agola/internal/services/runservice/action"
|
||||
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
type MaintenanceModeHandler struct {
|
||||
log *zap.SugaredLogger
|
||||
ah *action.ActionHandler
|
||||
e *etcd.Store
|
||||
}
|
||||
|
||||
func NewMaintenanceModeHandler(logger *zap.Logger, ah *action.ActionHandler, e *etcd.Store) *MaintenanceModeHandler {
|
||||
return &MaintenanceModeHandler{log: logger.Sugar(), ah: ah, e: e}
|
||||
}
|
||||
|
||||
func (h *MaintenanceModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
enable := false
|
||||
switch r.Method {
|
||||
case "PUT":
|
||||
enable = true
|
||||
case "DELETE":
|
||||
enable = false
|
||||
}
|
||||
|
||||
err := h.ah.MaintenanceMode(ctx, enable)
|
||||
if err != nil {
|
||||
h.log.Errorf("err: %+v", err)
|
||||
httpError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := httpResponse(w, http.StatusOK, nil); err != nil {
|
||||
h.log.Errorf("err: %+v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
type ExportHandler struct {
|
||||
log *zap.SugaredLogger
|
||||
ah *action.ActionHandler
|
||||
}
|
||||
|
||||
func NewExportHandler(logger *zap.Logger, ah *action.ActionHandler) *ExportHandler {
|
||||
return &ExportHandler{log: logger.Sugar(), ah: ah}
|
||||
}
|
||||
|
||||
func (h *ExportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
err := h.ah.Export(ctx, w)
|
||||
if err != nil {
|
||||
h.log.Errorf("err: %+v", err)
|
||||
// since we already answered with a 200 we cannot return another error code
|
||||
// So abort the connection and the client will detect the missing ending chunk
|
||||
// and consider this an error
|
||||
//
|
||||
// this is the way to force close a request without logging the panic
|
||||
panic(http.ErrAbortHandler)
|
||||
}
|
||||
}
|
||||
|
||||
type ImportHandler struct {
|
||||
log *zap.SugaredLogger
|
||||
ah *action.ActionHandler
|
||||
}
|
||||
|
||||
func NewImportHandler(logger *zap.Logger, ah *action.ActionHandler) *ImportHandler {
|
||||
return &ImportHandler{log: logger.Sugar(), ah: ah}
|
||||
}
|
||||
|
||||
func (h *ImportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
ctx := r.Context()
|
||||
|
||||
err := h.ah.Import(ctx, r.Body)
|
||||
if err != nil {
|
||||
h.log.Errorf("err: %+v", err)
|
||||
httpError(w, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := httpResponse(w, http.StatusOK, nil); err != nil {
|
||||
h.log.Errorf("err: %+v", err)
|
||||
}
|
||||
|
||||
}
|
|
@ -53,6 +53,8 @@ var (
|
|||
EtcdCompactChangeGroupsLockKey = path.Join(EtcdSchedulerBaseDir, "compactchangegroupslock")
|
||||
EtcdCacheCleanerLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "cachecleaner")
|
||||
EtcdTaskUpdaterLockKey = path.Join(EtcdSchedulerBaseDir, "locks", "taskupdater")
|
||||
|
||||
EtcdMaintenanceKey = "maintenance"
|
||||
)
|
||||
|
||||
func EtcdRunKey(runID string) string { return path.Join(EtcdRunsDir, runID) }
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"crypto/tls"
|
||||
"net/http"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
scommon "agola.io/agola/internal/common"
|
||||
|
@ -35,6 +36,7 @@ import (
|
|||
|
||||
"github.com/gorilla/mux"
|
||||
etcdclientv3 "go.etcd.io/etcd/clientv3"
|
||||
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||
"go.uber.org/zap/zapcore"
|
||||
)
|
||||
|
||||
|
@ -65,13 +67,78 @@ func (s *Runservice) etcdPinger(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Runservice) maintenanceModeWatcherLoop(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) {
|
||||
for {
|
||||
log.Debugf("maintenanceModeWatcherLoop")
|
||||
|
||||
// at first watch restart from previous processed revision
|
||||
if err := s.maintenanceModeWatcher(ctx, runCtxCancel, maintenanceModeEnabled); err != nil {
|
||||
log.Errorf("err: %+v", err)
|
||||
}
|
||||
|
||||
sleepCh := time.NewTimer(1 * time.Second).C
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-sleepCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Runservice) maintenanceModeWatcher(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) error {
|
||||
log.Infof("watcher: maintenance mode enabled: %t", maintenanceModeEnabled)
|
||||
resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||
if err != nil && err != etcd.ErrKeyNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
if len(resp.Kvs) > 0 {
|
||||
log.Infof("maintenance mode key is present")
|
||||
if !maintenanceModeEnabled {
|
||||
runCtxCancel()
|
||||
}
|
||||
}
|
||||
|
||||
revision := resp.Header.Revision
|
||||
|
||||
wctx := etcdclientv3.WithRequireLeader(ctx)
|
||||
|
||||
// restart from previous processed revision
|
||||
wch := s.e.Watch(wctx, common.EtcdMaintenanceKey, revision)
|
||||
|
||||
for wresp := range wch {
|
||||
if wresp.Canceled {
|
||||
return wresp.Err()
|
||||
}
|
||||
|
||||
for _, ev := range wresp.Events {
|
||||
switch ev.Type {
|
||||
case mvccpb.PUT:
|
||||
log.Infof("maintenance mode key set")
|
||||
if !maintenanceModeEnabled {
|
||||
runCtxCancel()
|
||||
}
|
||||
|
||||
case mvccpb.DELETE:
|
||||
log.Infof("maintenance mode key removed")
|
||||
if maintenanceModeEnabled {
|
||||
runCtxCancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type Runservice struct {
|
||||
c *config.Runservice
|
||||
e *etcd.Store
|
||||
ost *objectstorage.ObjStorage
|
||||
dm *datamanager.DataManager
|
||||
readDB *readdb.ReadDB
|
||||
ah *action.ActionHandler
|
||||
c *config.Runservice
|
||||
e *etcd.Store
|
||||
ost *objectstorage.ObjStorage
|
||||
dm *datamanager.DataManager
|
||||
readDB *readdb.ReadDB
|
||||
ah *action.ActionHandler
|
||||
maintenanceMode bool
|
||||
}
|
||||
|
||||
func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) {
|
||||
|
@ -137,31 +204,13 @@ func (s *Runservice) InitEtcd(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Runservice) Run(ctx context.Context) error {
|
||||
errCh := make(chan error)
|
||||
dmReadyCh := make(chan struct{})
|
||||
|
||||
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
|
||||
|
||||
// wait for dm to be ready
|
||||
<-dmReadyCh
|
||||
|
||||
for {
|
||||
err := s.InitEtcd(ctx)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to initialize etcd: %+v", err)
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
go func() { errCh <- s.readDB.Run(ctx) }()
|
||||
|
||||
ch := make(chan *types.ExecutorTask)
|
||||
func (s *Runservice) setupDefaultRouter(etCh chan *types.ExecutorTask) http.Handler {
|
||||
maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e)
|
||||
exportHandler := api.NewExportHandler(logger, s.ah)
|
||||
|
||||
// executor dedicated api, only calls from executor should happen on these handlers
|
||||
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
|
||||
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
|
||||
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, etCh)
|
||||
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
|
||||
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
|
||||
archivesHandler := api.NewArchivesHandler(logger, s.ost)
|
||||
|
@ -209,23 +258,55 @@ func (s *Runservice) Run(ctx context.Context) error {
|
|||
|
||||
apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET")
|
||||
|
||||
apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE")
|
||||
|
||||
apirouter.Handle("/export", exportHandler).Methods("GET")
|
||||
|
||||
mainrouter := mux.NewRouter()
|
||||
mainrouter.PathPrefix("/").Handler(router)
|
||||
|
||||
// Return a bad request when it doesn't match any route
|
||||
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
|
||||
|
||||
go s.executorTasksCleanerLoop(ctx)
|
||||
go s.runsSchedulerLoop(ctx)
|
||||
go s.runTasksUpdaterLoop(ctx)
|
||||
go s.fetcherLoop(ctx)
|
||||
go s.finishedRunsArchiverLoop(ctx)
|
||||
go s.compactChangeGroupsLoop(ctx)
|
||||
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
|
||||
go s.executorTaskUpdateHandler(ctx, ch)
|
||||
return mainrouter
|
||||
}
|
||||
|
||||
go s.etcdPingerLoop(ctx)
|
||||
func (s *Runservice) setupMaintenanceRouter() http.Handler {
|
||||
maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e)
|
||||
exportHandler := api.NewExportHandler(logger, s.ah)
|
||||
importHandler := api.NewImportHandler(logger, s.ah)
|
||||
|
||||
router := mux.NewRouter()
|
||||
apirouter := router.PathPrefix("/api/v1alpha").Subrouter().UseEncodedPath()
|
||||
|
||||
apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE")
|
||||
|
||||
apirouter.Handle("/export", exportHandler).Methods("GET")
|
||||
apirouter.Handle("/import", importHandler).Methods("POST")
|
||||
|
||||
mainrouter := mux.NewRouter()
|
||||
mainrouter.PathPrefix("/").Handler(router)
|
||||
|
||||
return mainrouter
|
||||
}
|
||||
|
||||
func (s *Runservice) Run(ctx context.Context) error {
|
||||
for {
|
||||
if err := s.run(ctx); err != nil {
|
||||
log.Errorf("run error: %+v", err)
|
||||
}
|
||||
|
||||
sleepCh := time.NewTimer(1 * time.Second).C
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Infof("runservice exiting")
|
||||
return nil
|
||||
case <-sleepCh:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Runservice) run(ctx context.Context) error {
|
||||
var tlsConfig *tls.Config
|
||||
if s.c.Web.TLS {
|
||||
var err error
|
||||
|
@ -236,32 +317,98 @@ func (s *Runservice) Run(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||
if err != nil && err != etcd.ErrKeyNotFound {
|
||||
return err
|
||||
}
|
||||
|
||||
maintenanceMode := false
|
||||
if len(resp.Kvs) > 0 {
|
||||
log.Infof("maintenance mode key is present")
|
||||
maintenanceMode = true
|
||||
}
|
||||
|
||||
s.maintenanceMode = maintenanceMode
|
||||
s.dm.SetMaintenanceMode(maintenanceMode)
|
||||
s.ah.SetMaintenanceMode(maintenanceMode)
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
errCh := make(chan error, 100)
|
||||
var wg sync.WaitGroup
|
||||
dmReadyCh := make(chan struct{})
|
||||
|
||||
var mainrouter http.Handler
|
||||
if s.maintenanceMode {
|
||||
mainrouter = s.setupMaintenanceRouter()
|
||||
util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) })
|
||||
|
||||
} else {
|
||||
ch := make(chan *types.ExecutorTask)
|
||||
mainrouter = s.setupDefaultRouter(ch)
|
||||
|
||||
util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) })
|
||||
|
||||
// TODO(sgotti) wait for all goroutines exiting
|
||||
util.GoWait(&wg, func() { errCh <- s.dm.Run(ctx, dmReadyCh) })
|
||||
|
||||
// wait for dm to be ready
|
||||
<-dmReadyCh
|
||||
|
||||
for {
|
||||
err := s.InitEtcd(ctx)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
log.Errorf("failed to initialize etcd: %+v", err)
|
||||
|
||||
sleepCh := time.NewTimer(1 * time.Second).C
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return nil
|
||||
case <-sleepCh:
|
||||
}
|
||||
}
|
||||
|
||||
util.GoWait(&wg, func() { errCh <- s.readDB.Run(ctx) })
|
||||
|
||||
util.GoWait(&wg, func() { s.executorTasksCleanerLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.runsSchedulerLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.runTasksUpdaterLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.fetcherLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.finishedRunsArchiverLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.compactChangeGroupsLoop(ctx) })
|
||||
util.GoWait(&wg, func() { s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) })
|
||||
util.GoWait(&wg, func() { s.executorTaskUpdateHandler(ctx, ch) })
|
||||
util.GoWait(&wg, func() { s.etcdPingerLoop(ctx) })
|
||||
}
|
||||
|
||||
httpServer := http.Server{
|
||||
Addr: s.c.Web.ListenAddress,
|
||||
Handler: mainrouter,
|
||||
TLSConfig: tlsConfig,
|
||||
}
|
||||
|
||||
lerrCh := make(chan error)
|
||||
go func() {
|
||||
lerrCh := make(chan error, 1)
|
||||
util.GoWait(&wg, func() {
|
||||
lerrCh <- httpServer.ListenAndServe()
|
||||
}()
|
||||
})
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Infof("runservice scheduler exiting")
|
||||
httpServer.Close()
|
||||
case err := <-lerrCh:
|
||||
log.Infof("runservice run exiting")
|
||||
case err = <-lerrCh:
|
||||
if err != nil {
|
||||
log.Errorf("http server listen error: %v", err)
|
||||
return err
|
||||
}
|
||||
case err := <-errCh:
|
||||
if err != nil {
|
||||
log.Errorf("error: %+v", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
cancel()
|
||||
httpServer.Close()
|
||||
wg.Wait()
|
||||
|
||||
return err
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue