Merge pull request #67 from sgotti/configstore_maintenance_export_import
configstore: maintenance/export/import
This commit is contained in:
commit
1034b3afdb
@ -16,21 +16,30 @@ package action
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"agola.io/agola/internal/datamanager"
|
"agola.io/agola/internal/datamanager"
|
||||||
|
"agola.io/agola/internal/etcd"
|
||||||
"agola.io/agola/internal/services/configstore/readdb"
|
"agola.io/agola/internal/services/configstore/readdb"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
type ActionHandler struct {
|
type ActionHandler struct {
|
||||||
log *zap.SugaredLogger
|
log *zap.SugaredLogger
|
||||||
readDB *readdb.ReadDB
|
readDB *readdb.ReadDB
|
||||||
dm *datamanager.DataManager
|
dm *datamanager.DataManager
|
||||||
|
e *etcd.Store
|
||||||
|
maintenanceMode bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewActionHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager) *ActionHandler {
|
func NewActionHandler(logger *zap.Logger, readDB *readdb.ReadDB, dm *datamanager.DataManager, e *etcd.Store) *ActionHandler {
|
||||||
return &ActionHandler{
|
return &ActionHandler{
|
||||||
log: logger.Sugar(),
|
log: logger.Sugar(),
|
||||||
readDB: readDB,
|
readDB: readDB,
|
||||||
dm: dm,
|
dm: dm,
|
||||||
|
e: e,
|
||||||
|
maintenanceMode: false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *ActionHandler) SetMaintenanceMode(maintenanceMode bool) {
|
||||||
|
h.maintenanceMode = maintenanceMode
|
||||||
|
}
|
||||||
|
73
internal/services/configstore/action/maintenance.go
Normal file
73
internal/services/configstore/action/maintenance.go
Normal file
@ -0,0 +1,73 @@
|
|||||||
|
// Copyright 2019 Sorint.lab
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package action
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"agola.io/agola/internal/etcd"
|
||||||
|
"agola.io/agola/internal/services/configstore/common"
|
||||||
|
"agola.io/agola/internal/util"
|
||||||
|
|
||||||
|
errors "golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (h *ActionHandler) MaintenanceMode(ctx context.Context, enable bool) error {
|
||||||
|
resp, err := h.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||||
|
if err != nil && err != etcd.ErrKeyNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if enable && len(resp.Kvs) > 0 {
|
||||||
|
return util.NewErrBadRequest(errors.Errorf("maintenance mode already enabled"))
|
||||||
|
}
|
||||||
|
if !enable && len(resp.Kvs) == 0 {
|
||||||
|
return util.NewErrBadRequest(errors.Errorf("maintenance mode already disabled"))
|
||||||
|
}
|
||||||
|
|
||||||
|
if enable {
|
||||||
|
txResp, err := h.e.AtomicPut(ctx, common.EtcdMaintenanceKey, []byte{}, 0, nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !txResp.Succeeded {
|
||||||
|
return errors.Errorf("failed to create maintenance mode key due to concurrent update")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !enable {
|
||||||
|
txResp, err := h.e.AtomicDelete(ctx, common.EtcdMaintenanceKey, resp.Kvs[0].ModRevision)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if !txResp.Succeeded {
|
||||||
|
return errors.Errorf("failed to delete maintenance mode key due to concurrent update")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ActionHandler) Export(ctx context.Context, w io.Writer) error {
|
||||||
|
return h.dm.Export(ctx, w)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ActionHandler) Import(ctx context.Context, r io.Reader) error {
|
||||||
|
if !h.maintenanceMode {
|
||||||
|
return util.NewErrBadRequest(errors.Errorf("not in maintenance mode"))
|
||||||
|
}
|
||||||
|
return h.dm.Import(ctx, r)
|
||||||
|
}
|
107
internal/services/configstore/api/maintenance.go
Normal file
107
internal/services/configstore/api/maintenance.go
Normal file
@ -0,0 +1,107 @@
|
|||||||
|
// Copyright 2019 Sorint.lab
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"agola.io/agola/internal/etcd"
|
||||||
|
"agola.io/agola/internal/services/configstore/action"
|
||||||
|
|
||||||
|
"go.uber.org/zap"
|
||||||
|
)
|
||||||
|
|
||||||
|
type MaintenanceModeHandler struct {
|
||||||
|
log *zap.SugaredLogger
|
||||||
|
ah *action.ActionHandler
|
||||||
|
e *etcd.Store
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMaintenanceModeHandler(logger *zap.Logger, ah *action.ActionHandler, e *etcd.Store) *MaintenanceModeHandler {
|
||||||
|
return &MaintenanceModeHandler{log: logger.Sugar(), ah: ah, e: e}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *MaintenanceModeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
enable := false
|
||||||
|
switch r.Method {
|
||||||
|
case "PUT":
|
||||||
|
enable = true
|
||||||
|
case "DELETE":
|
||||||
|
enable = false
|
||||||
|
}
|
||||||
|
|
||||||
|
err := h.ah.MaintenanceMode(ctx, enable)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Errorf("err: %+v", err)
|
||||||
|
httpError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := httpResponse(w, http.StatusOK, nil); err != nil {
|
||||||
|
h.log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
type ExportHandler struct {
|
||||||
|
log *zap.SugaredLogger
|
||||||
|
ah *action.ActionHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewExportHandler(logger *zap.Logger, ah *action.ActionHandler) *ExportHandler {
|
||||||
|
return &ExportHandler{log: logger.Sugar(), ah: ah}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ExportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
err := h.ah.Export(ctx, w)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Errorf("err: %+v", err)
|
||||||
|
// since we already answered with a 200 we cannot return another error code
|
||||||
|
// So abort the connection and the client will detect the missing ending chunk
|
||||||
|
// and consider this an error
|
||||||
|
//
|
||||||
|
// this is the way to force close a request without logging the panic
|
||||||
|
panic(http.ErrAbortHandler)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type ImportHandler struct {
|
||||||
|
log *zap.SugaredLogger
|
||||||
|
ah *action.ActionHandler
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewImportHandler(logger *zap.Logger, ah *action.ActionHandler) *ImportHandler {
|
||||||
|
return &ImportHandler{log: logger.Sugar(), ah: ah}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *ImportHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
err := h.ah.Import(ctx, r.Body)
|
||||||
|
if err != nil {
|
||||||
|
h.log.Errorf("err: %+v", err)
|
||||||
|
httpError(w, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := httpResponse(w, http.StatusOK, nil); err != nil {
|
||||||
|
h.log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -21,6 +21,10 @@ import (
|
|||||||
uuid "github.com/satori/go.uuid"
|
uuid "github.com/satori/go.uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
EtcdMaintenanceKey = "maintenance"
|
||||||
|
)
|
||||||
|
|
||||||
type RefType int
|
type RefType int
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -19,6 +19,8 @@ import (
|
|||||||
"crypto/tls"
|
"crypto/tls"
|
||||||
"net/http"
|
"net/http"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
scommon "agola.io/agola/internal/common"
|
scommon "agola.io/agola/internal/common"
|
||||||
"agola.io/agola/internal/datamanager"
|
"agola.io/agola/internal/datamanager"
|
||||||
@ -28,11 +30,14 @@ import (
|
|||||||
"agola.io/agola/internal/services/config"
|
"agola.io/agola/internal/services/config"
|
||||||
action "agola.io/agola/internal/services/configstore/action"
|
action "agola.io/agola/internal/services/configstore/action"
|
||||||
"agola.io/agola/internal/services/configstore/api"
|
"agola.io/agola/internal/services/configstore/api"
|
||||||
|
"agola.io/agola/internal/services/configstore/common"
|
||||||
"agola.io/agola/internal/services/configstore/readdb"
|
"agola.io/agola/internal/services/configstore/readdb"
|
||||||
"agola.io/agola/internal/services/types"
|
"agola.io/agola/internal/services/types"
|
||||||
"agola.io/agola/internal/util"
|
"agola.io/agola/internal/util"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
|
etcdclientv3 "go.etcd.io/etcd/clientv3"
|
||||||
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
"go.uber.org/zap/zapcore"
|
"go.uber.org/zap/zapcore"
|
||||||
)
|
)
|
||||||
@ -41,13 +46,78 @@ var level = zap.NewAtomicLevelAt(zapcore.InfoLevel)
|
|||||||
var logger = slog.New(level)
|
var logger = slog.New(level)
|
||||||
var log = logger.Sugar()
|
var log = logger.Sugar()
|
||||||
|
|
||||||
|
func (s *Configstore) maintenanceModeWatcherLoop(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) {
|
||||||
|
for {
|
||||||
|
log.Debugf("maintenanceModeWatcherLoop")
|
||||||
|
|
||||||
|
// at first watch restart from previous processed revision
|
||||||
|
if err := s.maintenanceModeWatcher(ctx, runCtxCancel, maintenanceModeEnabled); err != nil {
|
||||||
|
log.Errorf("err: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sleepCh := time.NewTimer(1 * time.Second).C
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-sleepCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Configstore) maintenanceModeWatcher(ctx context.Context, runCtxCancel context.CancelFunc, maintenanceModeEnabled bool) error {
|
||||||
|
log.Infof("watcher: maintenance mode enabled: %t", maintenanceModeEnabled)
|
||||||
|
resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||||
|
if err != nil && err != etcd.ErrKeyNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(resp.Kvs) > 0 {
|
||||||
|
log.Infof("maintenance mode key is present")
|
||||||
|
if !maintenanceModeEnabled {
|
||||||
|
runCtxCancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
revision := resp.Header.Revision
|
||||||
|
|
||||||
|
wctx := etcdclientv3.WithRequireLeader(ctx)
|
||||||
|
|
||||||
|
// restart from previous processed revision
|
||||||
|
wch := s.e.Watch(wctx, common.EtcdMaintenanceKey, revision)
|
||||||
|
|
||||||
|
for wresp := range wch {
|
||||||
|
if wresp.Canceled {
|
||||||
|
return wresp.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, ev := range wresp.Events {
|
||||||
|
switch ev.Type {
|
||||||
|
case mvccpb.PUT:
|
||||||
|
log.Infof("maintenance mode key set")
|
||||||
|
if !maintenanceModeEnabled {
|
||||||
|
runCtxCancel()
|
||||||
|
}
|
||||||
|
|
||||||
|
case mvccpb.DELETE:
|
||||||
|
log.Infof("maintenance mode key removed")
|
||||||
|
if maintenanceModeEnabled {
|
||||||
|
runCtxCancel()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
type Configstore struct {
|
type Configstore struct {
|
||||||
c *config.Configstore
|
c *config.Configstore
|
||||||
e *etcd.Store
|
e *etcd.Store
|
||||||
dm *datamanager.DataManager
|
dm *datamanager.DataManager
|
||||||
readDB *readdb.ReadDB
|
readDB *readdb.ReadDB
|
||||||
ost *objectstorage.ObjStorage
|
ost *objectstorage.ObjStorage
|
||||||
ah *action.ActionHandler
|
ah *action.ActionHandler
|
||||||
|
maintenanceMode bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewConfigstore(ctx context.Context, c *config.Configstore) (*Configstore, error) {
|
func NewConfigstore(ctx context.Context, c *config.Configstore) (*Configstore, error) {
|
||||||
@ -97,22 +167,15 @@ func NewConfigstore(ctx context.Context, c *config.Configstore) (*Configstore, e
|
|||||||
cs.dm = dm
|
cs.dm = dm
|
||||||
cs.readDB = readDB
|
cs.readDB = readDB
|
||||||
|
|
||||||
ah := action.NewActionHandler(logger, readDB, dm)
|
ah := action.NewActionHandler(logger, readDB, dm, e)
|
||||||
cs.ah = ah
|
cs.ah = ah
|
||||||
|
|
||||||
return cs, nil
|
return cs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Configstore) Run(ctx context.Context) error {
|
func (s *Configstore) setupDefaultRouter() http.Handler {
|
||||||
errCh := make(chan error)
|
maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e)
|
||||||
dmReadyCh := make(chan struct{})
|
exportHandler := api.NewExportHandler(logger, s.ah)
|
||||||
|
|
||||||
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
|
|
||||||
|
|
||||||
// wait for dm to be ready
|
|
||||||
<-dmReadyCh
|
|
||||||
|
|
||||||
go func() { errCh <- s.readDB.Run(ctx) }()
|
|
||||||
|
|
||||||
projectGroupHandler := api.NewProjectGroupHandler(logger, s.readDB)
|
projectGroupHandler := api.NewProjectGroupHandler(logger, s.readDB)
|
||||||
projectGroupSubgroupsHandler := api.NewProjectGroupSubgroupsHandler(logger, s.ah, s.readDB)
|
projectGroupSubgroupsHandler := api.NewProjectGroupSubgroupsHandler(logger, s.ah, s.readDB)
|
||||||
@ -227,9 +290,52 @@ func (s *Configstore) Run(ctx context.Context) error {
|
|||||||
apirouter.Handle("/remotesources/{remotesourceref}", updateRemoteSourceHandler).Methods("PUT")
|
apirouter.Handle("/remotesources/{remotesourceref}", updateRemoteSourceHandler).Methods("PUT")
|
||||||
apirouter.Handle("/remotesources/{remotesourceref}", deleteRemoteSourceHandler).Methods("DELETE")
|
apirouter.Handle("/remotesources/{remotesourceref}", deleteRemoteSourceHandler).Methods("DELETE")
|
||||||
|
|
||||||
|
apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE")
|
||||||
|
|
||||||
|
apirouter.Handle("/export", exportHandler).Methods("GET")
|
||||||
|
|
||||||
mainrouter := mux.NewRouter()
|
mainrouter := mux.NewRouter()
|
||||||
mainrouter.PathPrefix("/").Handler(router)
|
mainrouter.PathPrefix("/").Handler(router)
|
||||||
|
|
||||||
|
return mainrouter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Configstore) setupMaintenanceRouter() http.Handler {
|
||||||
|
maintenanceModeHandler := api.NewMaintenanceModeHandler(logger, s.ah, s.e)
|
||||||
|
exportHandler := api.NewExportHandler(logger, s.ah)
|
||||||
|
importHandler := api.NewImportHandler(logger, s.ah)
|
||||||
|
|
||||||
|
router := mux.NewRouter()
|
||||||
|
apirouter := router.PathPrefix("/api/v1alpha").Subrouter().UseEncodedPath()
|
||||||
|
|
||||||
|
apirouter.Handle("/maintenance", maintenanceModeHandler).Methods("PUT", "DELETE")
|
||||||
|
|
||||||
|
apirouter.Handle("/export", exportHandler).Methods("GET")
|
||||||
|
apirouter.Handle("/import", importHandler).Methods("POST")
|
||||||
|
|
||||||
|
mainrouter := mux.NewRouter()
|
||||||
|
mainrouter.PathPrefix("/").Handler(router)
|
||||||
|
|
||||||
|
return mainrouter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Configstore) Run(ctx context.Context) error {
|
||||||
|
for {
|
||||||
|
if err := s.run(ctx); err != nil {
|
||||||
|
log.Errorf("run error: %+v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
sleepCh := time.NewTimer(1 * time.Second).C
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
log.Infof("configstore exiting")
|
||||||
|
return nil
|
||||||
|
case <-sleepCh:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Configstore) run(ctx context.Context) error {
|
||||||
var tlsConfig *tls.Config
|
var tlsConfig *tls.Config
|
||||||
if s.c.Web.TLS {
|
if s.c.Web.TLS {
|
||||||
var err error
|
var err error
|
||||||
@ -240,21 +346,60 @@ func (s *Configstore) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
resp, err := s.e.Get(ctx, common.EtcdMaintenanceKey, 0)
|
||||||
|
if err != nil && err != etcd.ErrKeyNotFound {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
maintenanceMode := false
|
||||||
|
if len(resp.Kvs) > 0 {
|
||||||
|
log.Infof("maintenance mode key is present")
|
||||||
|
maintenanceMode = true
|
||||||
|
}
|
||||||
|
|
||||||
|
s.maintenanceMode = maintenanceMode
|
||||||
|
s.dm.SetMaintenanceMode(maintenanceMode)
|
||||||
|
s.ah.SetMaintenanceMode(maintenanceMode)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
errCh := make(chan error, 100)
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
dmReadyCh := make(chan struct{})
|
||||||
|
|
||||||
|
var mainrouter http.Handler
|
||||||
|
if s.maintenanceMode {
|
||||||
|
mainrouter = s.setupMaintenanceRouter()
|
||||||
|
util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) })
|
||||||
|
|
||||||
|
} else {
|
||||||
|
mainrouter = s.setupDefaultRouter()
|
||||||
|
|
||||||
|
util.GoWait(&wg, func() { s.maintenanceModeWatcherLoop(ctx, cancel, s.maintenanceMode) })
|
||||||
|
|
||||||
|
// TODO(sgotti) wait for all goroutines exiting
|
||||||
|
util.GoWait(&wg, func() { errCh <- s.dm.Run(ctx, dmReadyCh) })
|
||||||
|
|
||||||
|
// wait for dm to be ready
|
||||||
|
<-dmReadyCh
|
||||||
|
|
||||||
|
util.GoWait(&wg, func() { errCh <- s.readDB.Run(ctx) })
|
||||||
|
}
|
||||||
|
|
||||||
httpServer := http.Server{
|
httpServer := http.Server{
|
||||||
Addr: s.c.Web.ListenAddress,
|
Addr: s.c.Web.ListenAddress,
|
||||||
Handler: mainrouter,
|
Handler: mainrouter,
|
||||||
TLSConfig: tlsConfig,
|
TLSConfig: tlsConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
lerrCh := make(chan error)
|
lerrCh := make(chan error, 1)
|
||||||
go func() {
|
util.GoWait(&wg, func() {
|
||||||
lerrCh <- httpServer.ListenAndServe()
|
lerrCh <- httpServer.ListenAndServe()
|
||||||
}()
|
})
|
||||||
|
defer httpServer.Close()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
log.Infof("configstore exiting")
|
log.Infof("configstore run exiting")
|
||||||
httpServer.Close()
|
|
||||||
case err := <-lerrCh:
|
case err := <-lerrCh:
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("http server listen error: %+v", err)
|
log.Errorf("http server listen error: %+v", err)
|
||||||
@ -267,5 +412,9 @@ func (s *Configstore) Run(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
cancel()
|
||||||
|
httpServer.Close()
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
@ -15,6 +15,7 @@
|
|||||||
package configstore
|
package configstore
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
@ -216,7 +217,7 @@ func TestResync(t *testing.T) {
|
|||||||
cancel2()
|
cancel2()
|
||||||
|
|
||||||
// Do some more changes
|
// Do some more changes
|
||||||
for i := 11; i < 20; i++ {
|
for i := 10; i < 20; i++ {
|
||||||
if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil {
|
if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
@ -246,6 +247,10 @@ func TestResync(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
if len(users1) != 20 {
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Fatalf("expected %d users, got %d users", 20, len(users1))
|
||||||
|
}
|
||||||
|
|
||||||
users2, err := getUsers(ctx, cs2)
|
users2, err := getUsers(ctx, cs2)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -275,10 +280,238 @@ func TestResync(t *testing.T) {
|
|||||||
|
|
||||||
time.Sleep(5 * time.Second)
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
users3, err := getUsers(ctx, cs3)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !compareUsers(users1, users3) {
|
||||||
|
t.Logf("len(users1): %d", len(users1))
|
||||||
|
t.Logf("len(users3): %d", len(users3))
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Logf("users3: %s", util.Dump(users3))
|
||||||
|
t.Fatalf("users are different between the two readdbs")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestExportImport(t *testing.T) {
|
||||||
|
dir, err := ioutil.TempDir("", "agola")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
defer os.RemoveAll(dir)
|
||||||
|
|
||||||
|
etcdDir, err := ioutil.TempDir(dir, "etcd")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
tetcd := setupEtcd(t, etcdDir)
|
||||||
|
defer shutdownEtcd(tetcd)
|
||||||
|
|
||||||
|
listenAddress1, port1, err := testutil.GetFreePort(true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
listenAddress2, port2, err := testutil.GetFreePort(true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
listenAddress3, port3, err := testutil.GetFreePort(true, false)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
ostDir, err := ioutil.TempDir(dir, "ost")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
csDir1, err := ioutil.TempDir(dir, "cs1")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
csDir2, err := ioutil.TempDir(dir, "cs2")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
csDir3, err := ioutil.TempDir(dir, "cs3")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
baseConfig := config.Configstore{
|
||||||
|
Etcd: config.Etcd{
|
||||||
|
Endpoints: tetcd.Endpoint,
|
||||||
|
},
|
||||||
|
ObjectStorage: config.ObjectStorage{
|
||||||
|
Type: config.ObjectStorageTypePosix,
|
||||||
|
Path: ostDir,
|
||||||
|
},
|
||||||
|
Web: config.Web{},
|
||||||
|
}
|
||||||
|
cs1Config := baseConfig
|
||||||
|
cs1Config.DataDir = csDir1
|
||||||
|
cs1Config.Web.ListenAddress = net.JoinHostPort(listenAddress1, port1)
|
||||||
|
|
||||||
|
cs2Config := baseConfig
|
||||||
|
cs2Config.DataDir = csDir2
|
||||||
|
cs2Config.Web.ListenAddress = net.JoinHostPort(listenAddress2, port2)
|
||||||
|
|
||||||
|
cs3Config := baseConfig
|
||||||
|
cs3Config.DataDir = csDir3
|
||||||
|
cs3Config.Web.ListenAddress = net.JoinHostPort(listenAddress3, port3)
|
||||||
|
|
||||||
|
cs1, err := NewConfigstore(ctx, &cs1Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
cs2, err := NewConfigstore(ctx, &cs2Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
cs3, err := NewConfigstore(ctx, &cs3Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx1 := context.Background()
|
||||||
|
ctx2, cancel2 := context.WithCancel(context.Background())
|
||||||
|
ctx3, cancel3 := context.WithCancel(context.Background())
|
||||||
|
|
||||||
|
t.Logf("starting cs1")
|
||||||
|
go func() { _ = cs1.Run(ctx1) }()
|
||||||
|
t.Logf("starting cs2")
|
||||||
|
go func() { _ = cs2.Run(ctx2) }()
|
||||||
|
t.Logf("starting cs3")
|
||||||
|
go func() { _ = cs3.Run(ctx3) }()
|
||||||
|
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
// stop cs2
|
||||||
|
log.Infof("stopping cs2")
|
||||||
|
cancel2()
|
||||||
|
// stop cs3
|
||||||
|
log.Infof("stopping cs3")
|
||||||
|
cancel3()
|
||||||
|
|
||||||
|
// Do some more changes
|
||||||
|
for i := 10; i < 20; i++ {
|
||||||
|
if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
users1, err := getUsers(ctx, cs1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(users1) != 20 {
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Fatalf("expected %d users, got %d users", 20, len(users1))
|
||||||
|
}
|
||||||
|
|
||||||
|
var export bytes.Buffer
|
||||||
|
if err := cs1.ah.Export(ctx, &export); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cs1.ah.MaintenanceMode(ctx, true); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
if err := cs1.ah.Import(ctx, &export); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := cs1.ah.MaintenanceMode(ctx, false); err != nil {
|
||||||
|
t.Fatalf("unexpected err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
newUsers1, err := getUsers(ctx, cs1)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !compareUsers(users1, newUsers1) {
|
||||||
|
t.Logf("len(users1): %d", len(users1))
|
||||||
|
t.Logf("len(newUsers1): %d", len(newUsers1))
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Logf("newUsers1: %s", util.Dump(newUsers1))
|
||||||
|
t.Fatalf("users are different between the two readdbs")
|
||||||
|
}
|
||||||
|
|
||||||
|
// start cs2
|
||||||
|
// it should do a full resync since we have imported new data and there's now wal in etcd
|
||||||
|
cs2, err = NewConfigstore(ctx, &cs2Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
log.Infof("starting cs2")
|
||||||
|
ctx2 = context.Background()
|
||||||
|
go func() { _ = cs2.Run(ctx2) }()
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
|
users2, err := getUsers(ctx, cs2)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !compareUsers(users1, users2) {
|
||||||
|
t.Logf("len(users1): %d", len(users1))
|
||||||
|
t.Logf("len(users2): %d", len(users2))
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Logf("users2: %s", util.Dump(users2))
|
||||||
|
t.Fatalf("users are different between the two readdbs")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Do some more changes
|
||||||
|
for i := 20; i < 30; i++ {
|
||||||
|
if _, err := cs1.ah.CreateUser(ctx, &action.CreateUserRequest{UserName: fmt.Sprintf("user%d", i)}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
time.Sleep(200 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
users1, err = getUsers(ctx, cs1)
|
users1, err = getUsers(ctx, cs1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
if len(users1) != 30 {
|
||||||
|
t.Logf("users1: %s", util.Dump(users1))
|
||||||
|
t.Fatalf("expected %d users, got %d users", 30, len(users1))
|
||||||
|
}
|
||||||
|
|
||||||
|
// start cs3
|
||||||
|
// it should do a full resync since we have imported new data and there're some wals with a different epoch
|
||||||
|
cs3, err = NewConfigstore(ctx, &cs3Config)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
log.Infof("starting cs3")
|
||||||
|
ctx3 = context.Background()
|
||||||
|
go func() { _ = cs3.Run(ctx3) }()
|
||||||
|
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
|
||||||
users3, err := getUsers(ctx, cs3)
|
users3, err := getUsers(ctx, cs3)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
25
internal/util/goroutine.go
Normal file
25
internal/util/goroutine.go
Normal file
@ -0,0 +1,25 @@
|
|||||||
|
// Copyright 2019 Sorint.lab
|
||||||
|
//
|
||||||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
// you may not use this file except in compliance with the License.
|
||||||
|
// You may obtain a copy of the License at
|
||||||
|
//
|
||||||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
//
|
||||||
|
// Unless required by applicable law or agreed to in writing, software
|
||||||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
||||||
|
// See the License for the specific language governing permissions and
|
||||||
|
// limitations under the License.
|
||||||
|
|
||||||
|
package util
|
||||||
|
|
||||||
|
import "sync"
|
||||||
|
|
||||||
|
func GoWait(wg *sync.WaitGroup, f func()) {
|
||||||
|
wg.Add(1)
|
||||||
|
go func() {
|
||||||
|
f()
|
||||||
|
wg.Done()
|
||||||
|
}()
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user