diff --git a/cmd/agola/cmd/serve.go b/cmd/agola/cmd/serve.go index 7565c7f..c7c0592 100644 --- a/cmd/agola/cmd/serve.go +++ b/cmd/agola/cmd/serve.go @@ -21,11 +21,11 @@ import ( "github.com/sorintlab/agola/cmd" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/configstore" + "github.com/sorintlab/agola/internal/services/executor" + rsexecutor "github.com/sorintlab/agola/internal/services/executor" "github.com/sorintlab/agola/internal/services/gateway" "github.com/sorintlab/agola/internal/services/gitserver" - "github.com/sorintlab/agola/internal/services/runservice/executor" - rsexecutor "github.com/sorintlab/agola/internal/services/runservice/executor" - rsscheduler "github.com/sorintlab/agola/internal/services/runservice/scheduler" + rsscheduler "github.com/sorintlab/agola/internal/services/runservice" "github.com/sorintlab/agola/internal/services/scheduler" "github.com/sorintlab/agola/internal/util" @@ -139,17 +139,17 @@ func serve(cmd *cobra.Command, args []string) error { } } - var rssched *rsscheduler.Scheduler + var rs *rsscheduler.Runservice if isComponentEnabled("runservicescheduler") { - rssched, err = rsscheduler.NewScheduler(ctx, &c.RunServiceScheduler) + rs, err = rsscheduler.NewRunservice(ctx, &c.Runservice) if err != nil { return errors.Wrapf(err, "failed to start run service scheduler") } } - var rsex *rsexecutor.Executor + var ex *rsexecutor.Executor if isComponentEnabled("runserviceexecutor") { - rsex, err = executor.NewExecutor(&c.RunServiceExecutor) + ex, err = executor.NewExecutor(&c.Executor) if err != nil { return errors.Wrapf(err, "failed to start run service executor") } @@ -189,11 +189,11 @@ func serve(cmd *cobra.Command, args []string) error { errCh := make(chan error) - if rssched != nil { - go func() { errCh <- rssched.Run(ctx) }() + if rs != nil { + go func() { errCh <- rs.Run(ctx) }() } - if rsex != nil { - go func() { errCh <- rsex.Run(ctx) }() + if ex != nil { + go func() { errCh <- ex.Run(ctx) }() } if cs != nil { go func() { errCh <- cs.Run(ctx) }() diff --git a/examples/config.yml b/examples/config.yml index 57f1d93..1356e1e 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1,7 +1,7 @@ gateway: apiExposedURL: "http://172.17.0.1:8000" webExposedURL: "http://172.17.0.1:8080" - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" configstoreURL: "http://localhost:4002" gitServerURL: "http://172.17.0.1:4003" @@ -18,7 +18,7 @@ gateway: adminToken: "admintoken" scheduler: - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" configstore: dataDir: /tmp/agola/configstore @@ -30,9 +30,9 @@ configstore: web: listenAddress: ":4002" -runServiceScheduler: +runservice: #debug: true - dataDir: /tmp/agola/runservice/scheduler + dataDir: /tmp/agola/runservice objectStorage: type: posix path: /tmp/agola/runservice/ost @@ -41,10 +41,10 @@ runServiceScheduler: web: listenAddress: ":4000" -runServiceExecutor: - dataDir: /tmp/agola/runservice/executor +executor: + dataDir: /tmp/agola/executor toolboxPath: ./bin/agola-toolbox - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" web: listenAddress: ":4001" activeTasksLimit: 2 diff --git a/internal/services/config/config.go b/internal/services/config/config.go index ffc7c1c..dffb16d 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -33,12 +33,12 @@ type Config struct { // Defaults to "agola" ID string `yaml:"id"` - Gateway Gateway `yaml:"gateway"` - Scheduler Scheduler `yaml:"scheduler"` - RunServiceScheduler RunServiceScheduler `yaml:"runServiceScheduler"` - RunServiceExecutor RunServiceExecutor `yaml:"runServiceExecutor"` - Configstore Configstore `yaml:"configstore"` - GitServer GitServer `yaml:"gitServer"` + Gateway Gateway `yaml:"gateway"` + Scheduler Scheduler `yaml:"scheduler"` + Runservice Runservice `yaml:"runservice"` + Executor Executor `yaml:"executor"` + Configstore Configstore `yaml:"configstore"` + GitServer GitServer `yaml:"gitServer"` } type Gateway struct { @@ -51,7 +51,7 @@ type Gateway struct { // This is used for generating the redirect_url in oauth2 redirects WebExposedURL string `yaml:"webExposedURL"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` ConfigstoreURL string `yaml:"configstoreURL"` GitServerURL string `yaml:"gitServerURL"` @@ -67,10 +67,10 @@ type Gateway struct { type Scheduler struct { Debug bool `yaml:"debug"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` } -type RunServiceScheduler struct { +type Runservice struct { Debug bool `yaml:"debug"` DataDir string `yaml:"dataDir"` @@ -81,12 +81,12 @@ type RunServiceScheduler struct { RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` } -type RunServiceExecutor struct { +type Executor struct { Debug bool `yaml:"debug"` DataDir string `yaml:"dataDir"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` ToolboxPath string `yaml:"toolboxPath"` Web Web `yaml:"web"` @@ -208,10 +208,10 @@ var defaultConfig = Config{ Duration: 12 * time.Hour, }, }, - RunServiceScheduler: RunServiceScheduler{ + Runservice: Runservice{ RunCacheExpireInterval: 7 * 24 * time.Hour, }, - RunServiceExecutor: RunServiceExecutor{ + Executor: Executor{ ActiveTasksLimit: 2, }, } @@ -266,8 +266,8 @@ func Validate(c *Config) error { if c.Gateway.ConfigstoreURL == "" { return errors.Errorf("gateway configstoreURL is empty") } - if c.Gateway.RunServiceURL == "" { - return errors.Errorf("gateway runServiceURL is empty") + if c.Gateway.RunserviceURL == "" { + return errors.Errorf("gateway runserviceURL is empty") } if err := validateWeb(&c.Gateway.Web); err != nil { return errors.Wrapf(err, "gateway web configuration error") @@ -281,37 +281,37 @@ func Validate(c *Config) error { return errors.Wrapf(err, "configstore web configuration error") } - // Runservice Scheduler - if c.RunServiceScheduler.DataDir == "" { - return errors.Errorf("runservice scheduler dataDir is empty") + // Runservice + if c.Runservice.DataDir == "" { + return errors.Errorf("runservice dataDir is empty") } - if err := validateWeb(&c.RunServiceScheduler.Web); err != nil { - return errors.Wrapf(err, "runservice scheduler web configuration error") + if err := validateWeb(&c.Runservice.Web); err != nil { + return errors.Wrapf(err, "runservice web configuration error") } - // Runservice Executor - if c.RunServiceExecutor.DataDir == "" { - return errors.Errorf("runservice executor dataDir is empty") + // Executor + if c.Executor.DataDir == "" { + return errors.Errorf("executor dataDir is empty") } - if c.RunServiceExecutor.ToolboxPath == "" { + if c.Executor.ToolboxPath == "" { return errors.Errorf("git server toolboxPath is empty") } - if c.RunServiceExecutor.RunServiceURL == "" { - return errors.Errorf("runservice executor runServiceURL is empty") + if c.Executor.RunserviceURL == "" { + return errors.Errorf("executor runserviceURL is empty") } - if c.RunServiceExecutor.Driver.Type == "" { - return errors.Errorf("runservice executor driver type is empty") + if c.Executor.Driver.Type == "" { + return errors.Errorf("executor driver type is empty") } - switch c.RunServiceExecutor.Driver.Type { + switch c.Executor.Driver.Type { case DriverTypeDocker: case DriverTypeK8s: default: - return errors.Errorf("runservice executor driver type %q unknown", c.RunServiceExecutor.Driver.Type) + return errors.Errorf("executor driver type %q unknown", c.Executor.Driver.Type) } // Scheduler - if c.Scheduler.RunServiceURL == "" { - return errors.Errorf("scheduler runServiceURL is empty") + if c.Scheduler.RunserviceURL == "" { + return errors.Errorf("scheduler runserviceURL is empty") } // Git server diff --git a/internal/services/runservice/executor/api.go b/internal/services/executor/api.go similarity index 100% rename from internal/services/runservice/executor/api.go rename to internal/services/executor/api.go diff --git a/internal/services/runservice/executor/driver/docker.go b/internal/services/executor/driver/docker.go similarity index 99% rename from internal/services/runservice/executor/driver/docker.go rename to internal/services/executor/driver/docker.go index 8e9d803..4c73cbd 100644 --- a/internal/services/runservice/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -29,7 +29,7 @@ import ( "github.com/pkg/errors" "github.com/sorintlab/agola/internal/common" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" + "github.com/sorintlab/agola/internal/services/executor/registry" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" diff --git a/internal/services/runservice/executor/driver/docker_test.go b/internal/services/executor/driver/docker_test.go similarity index 100% rename from internal/services/runservice/executor/driver/docker_test.go rename to internal/services/executor/driver/docker_test.go diff --git a/internal/services/runservice/executor/driver/driver.go b/internal/services/executor/driver/driver.go similarity index 97% rename from internal/services/runservice/executor/driver/driver.go rename to internal/services/executor/driver/driver.go index 3d1fe60..d8279da 100644 --- a/internal/services/runservice/executor/driver/driver.go +++ b/internal/services/executor/driver/driver.go @@ -19,7 +19,7 @@ import ( "io" "github.com/sorintlab/agola/internal/common" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" + "github.com/sorintlab/agola/internal/services/executor/registry" ) const ( diff --git a/internal/services/runservice/executor/driver/k8s.go b/internal/services/executor/driver/k8s.go similarity index 100% rename from internal/services/runservice/executor/driver/k8s.go rename to internal/services/executor/driver/k8s.go diff --git a/internal/services/runservice/executor/driver/k8s_test.go b/internal/services/executor/driver/k8s_test.go similarity index 100% rename from internal/services/runservice/executor/driver/k8s_test.go rename to internal/services/executor/driver/k8s_test.go diff --git a/internal/services/runservice/executor/driver/k8slease.go b/internal/services/executor/driver/k8slease.go similarity index 100% rename from internal/services/runservice/executor/driver/k8slease.go rename to internal/services/executor/driver/k8slease.go diff --git a/internal/services/runservice/executor/executor.go b/internal/services/executor/executor.go similarity index 98% rename from internal/services/runservice/executor/executor.go rename to internal/services/executor/executor.go index dcecc8a..be456d7 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/executor/executor.go @@ -35,9 +35,9 @@ import ( "github.com/sorintlab/agola/internal/common" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" - "github.com/sorintlab/agola/internal/services/runservice/executor/driver" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + "github.com/sorintlab/agola/internal/services/executor/driver" + "github.com/sorintlab/agola/internal/services/executor/registry" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -1227,7 +1227,7 @@ func (e *Executor) saveExecutorID(id string) error { } type Executor struct { - c *config.RunServiceExecutor + c *config.Executor runserviceClient *rsapi.Client id string runningTasks *runningTasks @@ -1236,7 +1236,7 @@ type Executor struct { dynamic bool } -func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { +func NewExecutor(c *config.Executor) (*Executor, error) { if c.Debug { level.SetLevel(zapcore.DebugLevel) } @@ -1256,7 +1256,7 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { e := &Executor{ c: c, - runserviceClient: rsapi.NewClient(c.RunServiceURL), + runserviceClient: rsapi.NewClient(c.RunserviceURL), runningTasks: &runningTasks{ tasks: make(map[string]*runningTask), }, diff --git a/internal/services/runservice/executor/registry/registry.go b/internal/services/executor/registry/registry.go similarity index 100% rename from internal/services/runservice/executor/registry/registry.go rename to internal/services/executor/registry/registry.go diff --git a/internal/services/gateway/action/action.go b/internal/services/gateway/action/action.go index 744af48..9967cb9 100644 --- a/internal/services/gateway/action/action.go +++ b/internal/services/gateway/action/action.go @@ -20,7 +20,7 @@ import ( "github.com/pkg/errors" csapi "github.com/sorintlab/agola/internal/services/configstore/api" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "go.uber.org/zap" diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index a439032..116975c 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -20,7 +20,7 @@ import ( "net/http" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "github.com/pkg/errors" diff --git a/internal/services/gateway/gateway.go b/internal/services/gateway/gateway.go index 75f63cd..efa3241 100644 --- a/internal/services/gateway/gateway.go +++ b/internal/services/gateway/gateway.go @@ -29,7 +29,7 @@ import ( "github.com/sorintlab/agola/internal/services/gateway/api" "github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/handlers" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" jwt "github.com/dgrijalva/jwt-go" @@ -122,7 +122,7 @@ func NewGateway(gc *config.Config) (*Gateway, error) { } configstoreClient := csapi.NewClient(c.ConfigstoreURL) - runserviceClient := rsapi.NewClient(c.RunServiceURL) + runserviceClient := rsapi.NewClient(c.RunserviceURL) ah := action.NewActionHandler(logger, sd, configstoreClient, runserviceClient, gc.ID, c.APIExposedURL, c.WebExposedURL) diff --git a/internal/services/gateway/webhook.go b/internal/services/gateway/webhook.go index bec1240..6e392e5 100644 --- a/internal/services/gateway/webhook.go +++ b/internal/services/gateway/webhook.go @@ -27,7 +27,7 @@ import ( csapi "github.com/sorintlab/agola/internal/services/configstore/api" "github.com/sorintlab/agola/internal/services/gateway/action" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" rstypes "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/action/action.go b/internal/services/runservice/action/action.go similarity index 98% rename from internal/services/runservice/scheduler/action/action.go rename to internal/services/runservice/action/action.go index 95749ba..466de78 100644 --- a/internal/services/runservice/scheduler/action/action.go +++ b/internal/services/runservice/action/action.go @@ -26,9 +26,9 @@ import ( "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/sequence" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/action/action_test.go b/internal/services/runservice/action/action_test.go similarity index 100% rename from internal/services/runservice/scheduler/action/action_test.go rename to internal/services/runservice/action/action_test.go diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/api/api.go similarity index 98% rename from internal/services/runservice/scheduler/api/api.go rename to internal/services/runservice/api/api.go index 1fa2d60..951fd88 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/api/api.go @@ -27,10 +27,10 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/api/client.go similarity index 100% rename from internal/services/runservice/scheduler/api/client.go rename to internal/services/runservice/api/client.go diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/api/executor.go similarity index 97% rename from internal/services/runservice/scheduler/api/executor.go rename to internal/services/runservice/api/executor.go index 8cecdec..bceb6ba 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -26,9 +26,9 @@ import ( "github.com/gorilla/mux" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "go.uber.org/zap" ) diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/common/common.go similarity index 100% rename from internal/services/runservice/scheduler/common/common.go rename to internal/services/runservice/common/common.go diff --git a/internal/services/runservice/scheduler/common/events.go b/internal/services/runservice/common/events.go similarity index 100% rename from internal/services/runservice/scheduler/common/events.go rename to internal/services/runservice/common/events.go diff --git a/internal/services/runservice/scheduler/readdb/create.go b/internal/services/runservice/readdb/create.go similarity index 100% rename from internal/services/runservice/scheduler/readdb/create.go rename to internal/services/runservice/readdb/create.go diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go similarity index 99% rename from internal/services/runservice/scheduler/readdb/readdb.go rename to internal/services/runservice/readdb/readdb.go index 1f7c6e3..1f2beda 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -33,8 +33,8 @@ import ( "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/sequence" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" "go.uber.org/zap" diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go new file mode 100644 index 0000000..4dbf53a --- /dev/null +++ b/internal/services/runservice/runservice.go @@ -0,0 +1,270 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package runservice + +import ( + "context" + "crypto/tls" + "net/http" + "path/filepath" + "time" + + scommon "github.com/sorintlab/agola/internal/common" + "github.com/sorintlab/agola/internal/datamanager" + "github.com/sorintlab/agola/internal/etcd" + "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/services/config" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/api" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/types" + "github.com/sorintlab/agola/internal/util" + + ghandlers "github.com/gorilla/handlers" + "github.com/gorilla/mux" + etcdclientv3 "go.etcd.io/etcd/clientv3" + "go.uber.org/zap/zapcore" +) + +// etcdPingerLoop periodically updates a key. +// This is used by watchers to inform the client of the current revision +// this is needed since if other users are updating other unwatched keys on +// etcd we won't be notified, not updating the known revisions +// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? +func (s *Runservice) etcdPingerLoop(ctx context.Context) { + for { + if err := s.etcdPinger(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (s *Runservice) etcdPinger(ctx context.Context) error { + if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil { + return err + } + return nil +} + +type Runservice struct { + c *config.Runservice + e *etcd.Store + ost *objectstorage.ObjStorage + dm *datamanager.DataManager + readDB *readdb.ReadDB + ah *action.ActionHandler +} + +func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) { + if c.Debug { + level.SetLevel(zapcore.DebugLevel) + } + + ost, err := scommon.NewObjectStorage(&c.ObjectStorage) + if err != nil { + return nil, err + } + e, err := scommon.NewEtcd(&c.Etcd, logger, "runservice") + if err != nil { + return nil, err + } + + s := &Runservice{ + c: c, + e: e, + ost: ost, + } + + dmConf := &datamanager.DataManagerConfig{ + E: e, + OST: ost, + DataTypes: []string{ + string(common.DataTypeRun), + string(common.DataTypeRunConfig), + string(common.DataTypeRunCounter), + }, + } + dm, err := datamanager.NewDataManager(ctx, logger, dmConf) + if err != nil { + return nil, err + } + s.dm = dm + + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) + if err != nil { + return nil, err + } + s.readDB = readDB + + ah := action.NewActionHandler(logger, e, readDB, ost, dm) + s.ah = ah + + return s, nil +} + +func (s *Runservice) InitEtcd(ctx context.Context) error { + // Create changegroup min revision if it doesn't exists + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) + then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) + txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...) + if _, err := txn.Commit(); err != nil { + return etcd.FromEtcdError(err) + } + + return nil +} + +func (s *Runservice) Run(ctx context.Context) error { + errCh := make(chan error) + dmReadyCh := make(chan struct{}) + + go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() + + // wait for dm to be ready + <-dmReadyCh + + for { + err := s.InitEtcd(ctx) + if err == nil { + break + } + log.Errorf("failed to initialize etcd: %+v", err) + time.Sleep(1 * time.Second) + } + + go s.readDB.Run(ctx) + + ch := make(chan *types.ExecutorTask) + + // noop coors handler + corsHandler := func(h http.Handler) http.Handler { + return h + } + + corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"}) + corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"}) + corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"}) + corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) + + // executor dedicated api, only calls from executor should happen on these handlers + executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) + executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) + executorTaskHandler := api.NewExecutorTaskHandler(s.e) + executorTasksHandler := api.NewExecutorTasksHandler(s.e) + archivesHandler := api.NewArchivesHandler(logger, s.ost) + cacheHandler := api.NewCacheHandler(logger, s.ost) + cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) + + // api from clients + executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah) + + logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) + + runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) + runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah) + runsHandler := api.NewRunsHandler(logger, s.readDB) + runActionsHandler := api.NewRunActionsHandler(logger, s.ah) + runCreateHandler := api.NewRunCreateHandler(logger, s.ah) + changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) + + router := mux.NewRouter() + apirouter := router.PathPrefix("/api/v1alpha").Subrouter() + + // don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method + apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) + + apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") + apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE") + apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET") + apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") + apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") + apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST") + + apirouter.Handle("/logs", logsHandler).Methods("GET") + + apirouter.Handle("/runs/{runid}", runHandler).Methods("GET") + apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT") + apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") + apirouter.Handle("/runs", runsHandler).Methods("GET") + apirouter.Handle("/runs", runCreateHandler).Methods("POST") + + apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET") + + mainrouter := mux.NewRouter() + mainrouter.PathPrefix("/").Handler(corsHandler(router)) + + // Return a bad request when it doesn't match any route + mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) + + go s.executorTasksCleanerLoop(ctx) + go s.runsSchedulerLoop(ctx) + go s.runTasksUpdaterLoop(ctx) + go s.fetcherLoop(ctx) + go s.finishedRunsArchiverLoop(ctx) + go s.compactChangeGroupsLoop(ctx) + go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) + go s.executorTaskUpdateHandler(ctx, ch) + + go s.etcdPingerLoop(ctx) + + var tlsConfig *tls.Config + if s.c.Web.TLS { + var err error + tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false) + if err != nil { + log.Errorf("err: %+v") + return err + } + } + + httpServer := http.Server{ + Addr: s.c.Web.ListenAddress, + Handler: mainrouter, + TLSConfig: tlsConfig, + } + + lerrCh := make(chan error) + go func() { + lerrCh <- httpServer.ListenAndServe() + }() + + select { + case <-ctx.Done(): + log.Infof("runservice scheduler exiting") + httpServer.Close() + return nil + case err := <-lerrCh: + log.Errorf("http server listen error: %v", err) + return err + case err := <-errCh: + log.Errorf("error: %+v", err) + return err + } +} diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler.go similarity index 74% rename from internal/services/runservice/scheduler/scheduler.go rename to internal/services/runservice/scheduler.go index d75abf3..4d9480b 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -12,38 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package runservice import ( "bytes" "context" - "crypto/tls" "encoding/json" "fmt" "net/http" "os" - "path/filepath" "sort" "strconv" "time" - scommon "github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" - "github.com/sorintlab/agola/internal/services/config" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - ghandlers "github.com/gorilla/handlers" - "github.com/gorilla/mux" "github.com/pkg/errors" etcdclientv3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" @@ -65,7 +56,7 @@ func mergeEnv(dest, src map[string]string) { } } -func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { +func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { // the real source of active tasks is the number of executor tasks in etcd // we can't rely on RunTask.Status since it's only updated when receiveing // updated from the executor so it could be in a NotStarted state but have an @@ -84,7 +75,7 @@ func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([ return activeTasks, nil } -func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { +func (s *Runservice) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { activeTasks, err := s.runActiveExecutorTasks(ctx, runID) if err != nil { return false, err @@ -237,7 +228,7 @@ func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*t return tasksToRun, nil } -func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { +func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { log.Debugf("tasksToRun: %s", util.Dump(tasks)) for _, rt := range tasks { @@ -278,7 +269,7 @@ func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types. // chooseExecutor chooses the executor to schedule the task on. Now it's a very simple/dumb selection // TODO(sgotti) improve this to use executor statistic, labels (arch type) etc... -func (s *Scheduler) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) { +func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) { executors, err := store.GetExecutors(ctx, s.e) if err != nil { return nil, err @@ -318,7 +309,7 @@ func (p parentsByLevelName) Less(i, j int) bool { } func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { +func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { rct := rc.Tasks[rt.ID] environment := map[string]string{} @@ -382,7 +373,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types // sendExecutorTask sends executor task to executor, if this fails the executor // will periodically fetch the executortask anyway -func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -408,7 +399,7 @@ func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask return nil } -func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) { +func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) { for { if err := s.compactChangeGroups(ctx); err != nil { log.Errorf("err: %+v", err) @@ -424,7 +415,7 @@ func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) { } } -func (s *Scheduler) compactChangeGroups(ctx context.Context) error { +func (s *Runservice) compactChangeGroups(ctx context.Context) error { resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey) if err != nil { return err @@ -470,7 +461,7 @@ func (s *Scheduler) compactChangeGroups(ctx context.Context) error { return nil } -func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { +func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("r: %s", util.Dump(r)) prevPhase := r.Phase @@ -617,7 +608,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx return nil } -func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { r, _, err := store.GetRun(ctx, s.e, et.RunID) if err != nil { return err @@ -638,7 +629,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec return s.scheduleRun(ctx, r, rc) } -func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { +func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { log.Debugf("et: %s", util.Dump(et)) rt, ok := r.Tasks[et.ID] @@ -715,7 +706,7 @@ func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorT return nil } -func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { +func (s *Runservice) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { for { select { case <-ctx.Done(): @@ -732,7 +723,7 @@ func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *typ } } -func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) { +func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) { for { log.Debugf("executorTasksCleaner") @@ -750,7 +741,7 @@ func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) { } } -func (s *Scheduler) executorTasksCleaner(ctx context.Context) error { +func (s *Runservice) executorTasksCleaner(ctx context.Context) error { resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err @@ -771,7 +762,7 @@ func (s *Scheduler) executorTasksCleaner(ctx context.Context) error { return nil } -func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { log.Debugf("et: %s", util.Dump(et)) if et.Status.Phase.IsFinished() { r, _, err := store.GetRun(ctx, s.e, et.RunID) @@ -820,7 +811,7 @@ func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorT return nil } -func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) { +func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) { for { log.Debugf("runTasksUpdater") @@ -832,7 +823,7 @@ func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) { } } -func (s *Scheduler) runTasksUpdater(ctx context.Context) error { +func (s *Runservice) runTasksUpdater(ctx context.Context) error { log.Debugf("runTasksUpdater") session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) @@ -870,7 +861,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error { return nil } -func (s *Scheduler) fileExists(path string) (bool, error) { +func (s *Runservice) fileExists(path string) (bool, error) { _, err := os.Stat(path) if err != nil && !os.IsNotExist(err) { return false, err @@ -878,7 +869,7 @@ func (s *Scheduler) fileExists(path string) (bool, error) { return !os.IsNotExist(err), nil } -func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { +func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { et, err := store.GetExecutorTask(ctx, s.e, rt.ID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -944,7 +935,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return s.ost.WriteObject(logPath, r.Body, size, false) } -func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { +func (s *Runservice) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -961,7 +952,7 @@ func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID st return nil } -func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { +func (s *Runservice) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -981,7 +972,7 @@ func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID str return nil } -func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error { +func (s *Runservice) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -1011,7 +1002,7 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str return nil } -func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { +func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskLogs") // fetch setup log @@ -1040,7 +1031,7 @@ func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.R } } -func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error { +func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error { et, err := store.GetExecutorTask(ctx, s.e, rt.ID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -1095,7 +1086,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return s.ost.WriteObject(path, r.Body, size, false) } -func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { +func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskArchives") for i, stepnum := range rt.WorkspaceArchives { phase := rt.WorkspaceArchivesPhase[i] @@ -1112,7 +1103,7 @@ func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *typ } } -func (s *Scheduler) fetcherLoop(ctx context.Context) { +func (s *Runservice) fetcherLoop(ctx context.Context) { for { log.Debugf("fetcher") @@ -1130,7 +1121,7 @@ func (s *Scheduler) fetcherLoop(ctx context.Context) { } } -func (s *Scheduler) fetcher(ctx context.Context) error { +func (s *Runservice) fetcher(ctx context.Context) error { log.Debugf("fetcher") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1155,7 +1146,7 @@ func (s *Scheduler) fetcher(ctx context.Context) error { } -func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { +func (s *Runservice) runsSchedulerLoop(ctx context.Context) { for { log.Debugf("runsSchedulerLoop") @@ -1173,7 +1164,7 @@ func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { } } -func (s *Scheduler) runsScheduler(ctx context.Context) error { +func (s *Runservice) runsScheduler(ctx context.Context) error { log.Debugf("runsScheduler") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1188,7 +1179,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error { return nil } -func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { +func (s *Runservice) runScheduler(ctx context.Context, r *types.Run) error { log.Debugf("runScheduler") rc, err := store.OSTGetRunConfig(s.dm, r.ID) if err != nil { @@ -1198,7 +1189,7 @@ func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { return s.scheduleRun(ctx, r, rc) } -func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { +func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) { for { log.Debugf("finished run archiver loop") @@ -1216,7 +1207,7 @@ func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { } } -func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { +func (s *Runservice) finishedRunsArchiver(ctx context.Context) error { log.Debugf("finished run archiver") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1246,7 +1237,7 @@ func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { // finishedRunArchiver archives a run if it's finished and all the fetching // phases (logs and archives) are marked as finished -func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error { +func (s *Runservice) finishedRunArchiver(ctx context.Context, r *types.Run) error { //log.Debugf("r: %s", util.Dump(r)) if !r.Phase.IsFinished() { return nil @@ -1289,7 +1280,7 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error return nil } -func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { +func (s *Runservice) runOSTArchiver(ctx context.Context, r *types.Run) error { // TODO(sgotti) avoid saving multiple times the run on objectstorage if the // store.DeletedArchivedRun fails log.Infof("saving run in objectstorage: %s", r.ID) @@ -1312,7 +1303,7 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { +func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { for { if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil { log.Errorf("err: %+v", err) @@ -1328,7 +1319,7 @@ func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval ti } } -func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { +func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { log.Debugf("cacheCleaner") session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) @@ -1364,233 +1355,3 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D return nil } - -// etcdPingerLoop periodically updates a key. -// This is used by watchers to inform the client of the current revision -// this is needed since if other users are updating other unwatched keys on -// etcd we won't be notified, not updating the known revisions -// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? -func (s *Scheduler) etcdPingerLoop(ctx context.Context) { - for { - if err := s.etcdPinger(ctx); err != nil { - log.Errorf("err: %+v", err) - } - - select { - case <-ctx.Done(): - return - default: - } - - time.Sleep(1 * time.Second) - } -} - -func (s *Scheduler) etcdPinger(ctx context.Context) error { - if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil { - return err - } - return nil -} - -type Scheduler struct { - c *config.RunServiceScheduler - e *etcd.Store - ost *objectstorage.ObjStorage - dm *datamanager.DataManager - readDB *readdb.ReadDB - ah *action.ActionHandler -} - -func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Scheduler, error) { - if c.Debug { - level.SetLevel(zapcore.DebugLevel) - } - - ost, err := scommon.NewObjectStorage(&c.ObjectStorage) - if err != nil { - return nil, err - } - e, err := scommon.NewEtcd(&c.Etcd, logger, "runscheduler") - if err != nil { - return nil, err - } - - s := &Scheduler{ - c: c, - e: e, - ost: ost, - } - - dmConf := &datamanager.DataManagerConfig{ - E: e, - OST: ost, - DataTypes: []string{ - string(common.DataTypeRun), - string(common.DataTypeRunConfig), - string(common.DataTypeRunCounter), - }, - } - dm, err := datamanager.NewDataManager(ctx, logger, dmConf) - if err != nil { - return nil, err - } - s.dm = dm - - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) - if err != nil { - return nil, err - } - s.readDB = readDB - - ah := action.NewActionHandler(logger, e, readDB, ost, dm) - s.ah = ah - - return s, nil -} - -func (s *Scheduler) InitEtcd(ctx context.Context) error { - // Create changegroup min revision if it doesn't exists - cmp := []etcdclientv3.Cmp{} - then := []etcdclientv3.Op{} - - cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) - then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) - txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...) - if _, err := txn.Commit(); err != nil { - return etcd.FromEtcdError(err) - } - - return nil -} - -func (s *Scheduler) Run(ctx context.Context) error { - errCh := make(chan error) - dmReadyCh := make(chan struct{}) - - go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - - // wait for dm to be ready - <-dmReadyCh - - for { - err := s.InitEtcd(ctx) - if err == nil { - break - } - log.Errorf("failed to initialize etcd: %+v", err) - time.Sleep(1 * time.Second) - } - - go s.readDB.Run(ctx) - - ch := make(chan *types.ExecutorTask) - - // noop coors handler - corsHandler := func(h http.Handler) http.Handler { - return h - } - - corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"}) - corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"}) - corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"}) - corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) - - // executor dedicated api, only calls from executor should happen on these handlers - executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) - executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) - executorTaskHandler := api.NewExecutorTaskHandler(s.e) - executorTasksHandler := api.NewExecutorTasksHandler(s.e) - archivesHandler := api.NewArchivesHandler(logger, s.ost) - cacheHandler := api.NewCacheHandler(logger, s.ost) - cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) - - // api from clients - executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah) - - logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) - - runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) - runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah) - runsHandler := api.NewRunsHandler(logger, s.readDB) - runActionsHandler := api.NewRunActionsHandler(logger, s.ah) - runCreateHandler := api.NewRunCreateHandler(logger, s.ah) - changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) - - router := mux.NewRouter() - apirouter := router.PathPrefix("/api/v1alpha").Subrouter() - - // don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method - apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) - - apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") - apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE") - apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET") - apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") - apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") - apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") - apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD") - apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET") - apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST") - - apirouter.Handle("/logs", logsHandler).Methods("GET") - - apirouter.Handle("/runs/{runid}", runHandler).Methods("GET") - apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT") - apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") - apirouter.Handle("/runs", runsHandler).Methods("GET") - apirouter.Handle("/runs", runCreateHandler).Methods("POST") - - apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET") - - mainrouter := mux.NewRouter() - mainrouter.PathPrefix("/").Handler(corsHandler(router)) - - // Return a bad request when it doesn't match any route - mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) - - go s.executorTasksCleanerLoop(ctx) - go s.runsSchedulerLoop(ctx) - go s.runTasksUpdaterLoop(ctx) - go s.fetcherLoop(ctx) - go s.finishedRunsArchiverLoop(ctx) - go s.compactChangeGroupsLoop(ctx) - go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) - go s.executorTaskUpdateHandler(ctx, ch) - - go s.etcdPingerLoop(ctx) - - var tlsConfig *tls.Config - if s.c.Web.TLS { - var err error - tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false) - if err != nil { - log.Errorf("err: %+v") - return err - } - } - - httpServer := http.Server{ - Addr: s.c.Web.ListenAddress, - Handler: mainrouter, - TLSConfig: tlsConfig, - } - - lerrCh := make(chan error) - go func() { - lerrCh <- httpServer.ListenAndServe() - }() - - select { - case <-ctx.Done(): - log.Infof("runservice scheduler exiting") - httpServer.Close() - return nil - case err := <-lerrCh: - log.Errorf("http server listen error: %v", err) - return err - case err := <-errCh: - log.Errorf("error: %+v", err) - return err - } -} diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler_test.go similarity index 99% rename from internal/services/runservice/scheduler/scheduler_test.go rename to internal/services/runservice/scheduler_test.go index cf6e7d0..e69f1a2 100644 --- a/internal/services/runservice/scheduler/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package runservice import ( "context" diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/store/store.go similarity index 99% rename from internal/services/runservice/scheduler/store/store.go rename to internal/services/runservice/store/store.go index 3778ea2..65b1bd3 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/store/store.go @@ -25,7 +25,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" + "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index 8334ea1..9e1d17c 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -23,7 +23,7 @@ import ( slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "github.com/pkg/errors" @@ -194,7 +194,7 @@ func NewScheduler(c *config.Scheduler) (*Scheduler, error) { } return &Scheduler{ - runserviceClient: rsapi.NewClient(c.RunServiceURL), + runserviceClient: rsapi.NewClient(c.RunserviceURL), }, nil }