From 1e34dca95da3ebfb60cc901d8fae7140f7bb9da2 Mon Sep 17 00:00:00 2001 From: Simone Gotti Date: Tue, 7 May 2019 23:56:10 +0200 Subject: [PATCH] runservice: split and simplify scheduler and executor naming Also if they are logically part of the runservice the names runserviceExecutor and runserviceScheduler are long and quite confusing for an external user Simplify them separating both the code parts and updating the names: runserviceScheduler -> runservice runserviceExecutor -> executor --- cmd/agola/cmd/serve.go | 22 +- examples/config.yml | 14 +- internal/services/config/config.go | 64 ++-- .../services/{runservice => }/executor/api.go | 0 .../executor/driver/docker.go | 2 +- .../executor/driver/docker_test.go | 0 .../executor/driver/driver.go | 2 +- .../{runservice => }/executor/driver/k8s.go | 0 .../executor/driver/k8s_test.go | 0 .../executor/driver/k8slease.go | 0 .../{runservice => }/executor/executor.go | 12 +- .../executor/registry/registry.go | 0 internal/services/gateway/action/action.go | 2 +- internal/services/gateway/action/run.go | 2 +- internal/services/gateway/gateway.go | 4 +- internal/services/gateway/webhook.go | 2 +- .../{scheduler => }/action/action.go | 6 +- .../{scheduler => }/action/action_test.go | 0 .../runservice/{scheduler => }/api/api.go | 8 +- .../runservice/{scheduler => }/api/client.go | 0 .../{scheduler => }/api/executor.go | 6 +- .../{scheduler => }/common/common.go | 0 .../{scheduler => }/common/events.go | 0 .../{scheduler => }/readdb/create.go | 0 .../{scheduler => }/readdb/readdb.go | 4 +- internal/services/runservice/runservice.go | 270 +++++++++++++++ .../runservice/{scheduler => }/scheduler.go | 317 +++--------------- .../{scheduler => }/scheduler_test.go | 2 +- .../runservice/{scheduler => }/store/store.go | 2 +- internal/services/scheduler/scheduler.go | 4 +- 30 files changed, 388 insertions(+), 357 deletions(-) rename internal/services/{runservice => }/executor/api.go (100%) rename internal/services/{runservice => }/executor/driver/docker.go (99%) rename internal/services/{runservice => }/executor/driver/docker_test.go (100%) rename internal/services/{runservice => }/executor/driver/driver.go (97%) rename internal/services/{runservice => }/executor/driver/k8s.go (100%) rename internal/services/{runservice => }/executor/driver/k8s_test.go (100%) rename internal/services/{runservice => }/executor/driver/k8slease.go (100%) rename internal/services/{runservice => }/executor/executor.go (98%) rename internal/services/{runservice => }/executor/registry/registry.go (100%) rename internal/services/runservice/{scheduler => }/action/action.go (98%) rename internal/services/runservice/{scheduler => }/action/action_test.go (100%) rename internal/services/runservice/{scheduler => }/api/api.go (98%) rename internal/services/runservice/{scheduler => }/api/client.go (100%) rename internal/services/runservice/{scheduler => }/api/executor.go (97%) rename internal/services/runservice/{scheduler => }/common/common.go (100%) rename internal/services/runservice/{scheduler => }/common/events.go (100%) rename internal/services/runservice/{scheduler => }/readdb/create.go (100%) rename internal/services/runservice/{scheduler => }/readdb/readdb.go (99%) create mode 100644 internal/services/runservice/runservice.go rename internal/services/runservice/{scheduler => }/scheduler.go (74%) rename internal/services/runservice/{scheduler => }/scheduler_test.go (99%) rename internal/services/runservice/{scheduler => }/store/store.go (99%) diff --git a/cmd/agola/cmd/serve.go b/cmd/agola/cmd/serve.go index 7565c7f..c7c0592 100644 --- a/cmd/agola/cmd/serve.go +++ b/cmd/agola/cmd/serve.go @@ -21,11 +21,11 @@ import ( "github.com/sorintlab/agola/cmd" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/configstore" + "github.com/sorintlab/agola/internal/services/executor" + rsexecutor "github.com/sorintlab/agola/internal/services/executor" "github.com/sorintlab/agola/internal/services/gateway" "github.com/sorintlab/agola/internal/services/gitserver" - "github.com/sorintlab/agola/internal/services/runservice/executor" - rsexecutor "github.com/sorintlab/agola/internal/services/runservice/executor" - rsscheduler "github.com/sorintlab/agola/internal/services/runservice/scheduler" + rsscheduler "github.com/sorintlab/agola/internal/services/runservice" "github.com/sorintlab/agola/internal/services/scheduler" "github.com/sorintlab/agola/internal/util" @@ -139,17 +139,17 @@ func serve(cmd *cobra.Command, args []string) error { } } - var rssched *rsscheduler.Scheduler + var rs *rsscheduler.Runservice if isComponentEnabled("runservicescheduler") { - rssched, err = rsscheduler.NewScheduler(ctx, &c.RunServiceScheduler) + rs, err = rsscheduler.NewRunservice(ctx, &c.Runservice) if err != nil { return errors.Wrapf(err, "failed to start run service scheduler") } } - var rsex *rsexecutor.Executor + var ex *rsexecutor.Executor if isComponentEnabled("runserviceexecutor") { - rsex, err = executor.NewExecutor(&c.RunServiceExecutor) + ex, err = executor.NewExecutor(&c.Executor) if err != nil { return errors.Wrapf(err, "failed to start run service executor") } @@ -189,11 +189,11 @@ func serve(cmd *cobra.Command, args []string) error { errCh := make(chan error) - if rssched != nil { - go func() { errCh <- rssched.Run(ctx) }() + if rs != nil { + go func() { errCh <- rs.Run(ctx) }() } - if rsex != nil { - go func() { errCh <- rsex.Run(ctx) }() + if ex != nil { + go func() { errCh <- ex.Run(ctx) }() } if cs != nil { go func() { errCh <- cs.Run(ctx) }() diff --git a/examples/config.yml b/examples/config.yml index 57f1d93..1356e1e 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -1,7 +1,7 @@ gateway: apiExposedURL: "http://172.17.0.1:8000" webExposedURL: "http://172.17.0.1:8080" - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" configstoreURL: "http://localhost:4002" gitServerURL: "http://172.17.0.1:4003" @@ -18,7 +18,7 @@ gateway: adminToken: "admintoken" scheduler: - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" configstore: dataDir: /tmp/agola/configstore @@ -30,9 +30,9 @@ configstore: web: listenAddress: ":4002" -runServiceScheduler: +runservice: #debug: true - dataDir: /tmp/agola/runservice/scheduler + dataDir: /tmp/agola/runservice objectStorage: type: posix path: /tmp/agola/runservice/ost @@ -41,10 +41,10 @@ runServiceScheduler: web: listenAddress: ":4000" -runServiceExecutor: - dataDir: /tmp/agola/runservice/executor +executor: + dataDir: /tmp/agola/executor toolboxPath: ./bin/agola-toolbox - runServiceURL: "http://localhost:4000" + runserviceURL: "http://localhost:4000" web: listenAddress: ":4001" activeTasksLimit: 2 diff --git a/internal/services/config/config.go b/internal/services/config/config.go index ffc7c1c..dffb16d 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -33,12 +33,12 @@ type Config struct { // Defaults to "agola" ID string `yaml:"id"` - Gateway Gateway `yaml:"gateway"` - Scheduler Scheduler `yaml:"scheduler"` - RunServiceScheduler RunServiceScheduler `yaml:"runServiceScheduler"` - RunServiceExecutor RunServiceExecutor `yaml:"runServiceExecutor"` - Configstore Configstore `yaml:"configstore"` - GitServer GitServer `yaml:"gitServer"` + Gateway Gateway `yaml:"gateway"` + Scheduler Scheduler `yaml:"scheduler"` + Runservice Runservice `yaml:"runservice"` + Executor Executor `yaml:"executor"` + Configstore Configstore `yaml:"configstore"` + GitServer GitServer `yaml:"gitServer"` } type Gateway struct { @@ -51,7 +51,7 @@ type Gateway struct { // This is used for generating the redirect_url in oauth2 redirects WebExposedURL string `yaml:"webExposedURL"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` ConfigstoreURL string `yaml:"configstoreURL"` GitServerURL string `yaml:"gitServerURL"` @@ -67,10 +67,10 @@ type Gateway struct { type Scheduler struct { Debug bool `yaml:"debug"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` } -type RunServiceScheduler struct { +type Runservice struct { Debug bool `yaml:"debug"` DataDir string `yaml:"dataDir"` @@ -81,12 +81,12 @@ type RunServiceScheduler struct { RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` } -type RunServiceExecutor struct { +type Executor struct { Debug bool `yaml:"debug"` DataDir string `yaml:"dataDir"` - RunServiceURL string `yaml:"runServiceURL"` + RunserviceURL string `yaml:"runserviceURL"` ToolboxPath string `yaml:"toolboxPath"` Web Web `yaml:"web"` @@ -208,10 +208,10 @@ var defaultConfig = Config{ Duration: 12 * time.Hour, }, }, - RunServiceScheduler: RunServiceScheduler{ + Runservice: Runservice{ RunCacheExpireInterval: 7 * 24 * time.Hour, }, - RunServiceExecutor: RunServiceExecutor{ + Executor: Executor{ ActiveTasksLimit: 2, }, } @@ -266,8 +266,8 @@ func Validate(c *Config) error { if c.Gateway.ConfigstoreURL == "" { return errors.Errorf("gateway configstoreURL is empty") } - if c.Gateway.RunServiceURL == "" { - return errors.Errorf("gateway runServiceURL is empty") + if c.Gateway.RunserviceURL == "" { + return errors.Errorf("gateway runserviceURL is empty") } if err := validateWeb(&c.Gateway.Web); err != nil { return errors.Wrapf(err, "gateway web configuration error") @@ -281,37 +281,37 @@ func Validate(c *Config) error { return errors.Wrapf(err, "configstore web configuration error") } - // Runservice Scheduler - if c.RunServiceScheduler.DataDir == "" { - return errors.Errorf("runservice scheduler dataDir is empty") + // Runservice + if c.Runservice.DataDir == "" { + return errors.Errorf("runservice dataDir is empty") } - if err := validateWeb(&c.RunServiceScheduler.Web); err != nil { - return errors.Wrapf(err, "runservice scheduler web configuration error") + if err := validateWeb(&c.Runservice.Web); err != nil { + return errors.Wrapf(err, "runservice web configuration error") } - // Runservice Executor - if c.RunServiceExecutor.DataDir == "" { - return errors.Errorf("runservice executor dataDir is empty") + // Executor + if c.Executor.DataDir == "" { + return errors.Errorf("executor dataDir is empty") } - if c.RunServiceExecutor.ToolboxPath == "" { + if c.Executor.ToolboxPath == "" { return errors.Errorf("git server toolboxPath is empty") } - if c.RunServiceExecutor.RunServiceURL == "" { - return errors.Errorf("runservice executor runServiceURL is empty") + if c.Executor.RunserviceURL == "" { + return errors.Errorf("executor runserviceURL is empty") } - if c.RunServiceExecutor.Driver.Type == "" { - return errors.Errorf("runservice executor driver type is empty") + if c.Executor.Driver.Type == "" { + return errors.Errorf("executor driver type is empty") } - switch c.RunServiceExecutor.Driver.Type { + switch c.Executor.Driver.Type { case DriverTypeDocker: case DriverTypeK8s: default: - return errors.Errorf("runservice executor driver type %q unknown", c.RunServiceExecutor.Driver.Type) + return errors.Errorf("executor driver type %q unknown", c.Executor.Driver.Type) } // Scheduler - if c.Scheduler.RunServiceURL == "" { - return errors.Errorf("scheduler runServiceURL is empty") + if c.Scheduler.RunserviceURL == "" { + return errors.Errorf("scheduler runserviceURL is empty") } // Git server diff --git a/internal/services/runservice/executor/api.go b/internal/services/executor/api.go similarity index 100% rename from internal/services/runservice/executor/api.go rename to internal/services/executor/api.go diff --git a/internal/services/runservice/executor/driver/docker.go b/internal/services/executor/driver/docker.go similarity index 99% rename from internal/services/runservice/executor/driver/docker.go rename to internal/services/executor/driver/docker.go index 8e9d803..4c73cbd 100644 --- a/internal/services/runservice/executor/driver/docker.go +++ b/internal/services/executor/driver/docker.go @@ -29,7 +29,7 @@ import ( "github.com/pkg/errors" "github.com/sorintlab/agola/internal/common" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" + "github.com/sorintlab/agola/internal/services/executor/registry" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" diff --git a/internal/services/runservice/executor/driver/docker_test.go b/internal/services/executor/driver/docker_test.go similarity index 100% rename from internal/services/runservice/executor/driver/docker_test.go rename to internal/services/executor/driver/docker_test.go diff --git a/internal/services/runservice/executor/driver/driver.go b/internal/services/executor/driver/driver.go similarity index 97% rename from internal/services/runservice/executor/driver/driver.go rename to internal/services/executor/driver/driver.go index 3d1fe60..d8279da 100644 --- a/internal/services/runservice/executor/driver/driver.go +++ b/internal/services/executor/driver/driver.go @@ -19,7 +19,7 @@ import ( "io" "github.com/sorintlab/agola/internal/common" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" + "github.com/sorintlab/agola/internal/services/executor/registry" ) const ( diff --git a/internal/services/runservice/executor/driver/k8s.go b/internal/services/executor/driver/k8s.go similarity index 100% rename from internal/services/runservice/executor/driver/k8s.go rename to internal/services/executor/driver/k8s.go diff --git a/internal/services/runservice/executor/driver/k8s_test.go b/internal/services/executor/driver/k8s_test.go similarity index 100% rename from internal/services/runservice/executor/driver/k8s_test.go rename to internal/services/executor/driver/k8s_test.go diff --git a/internal/services/runservice/executor/driver/k8slease.go b/internal/services/executor/driver/k8slease.go similarity index 100% rename from internal/services/runservice/executor/driver/k8slease.go rename to internal/services/executor/driver/k8slease.go diff --git a/internal/services/runservice/executor/executor.go b/internal/services/executor/executor.go similarity index 98% rename from internal/services/runservice/executor/executor.go rename to internal/services/executor/executor.go index dcecc8a..be456d7 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/executor/executor.go @@ -35,9 +35,9 @@ import ( "github.com/sorintlab/agola/internal/common" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" - "github.com/sorintlab/agola/internal/services/runservice/executor/driver" - "github.com/sorintlab/agola/internal/services/runservice/executor/registry" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + "github.com/sorintlab/agola/internal/services/executor/driver" + "github.com/sorintlab/agola/internal/services/executor/registry" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" @@ -1227,7 +1227,7 @@ func (e *Executor) saveExecutorID(id string) error { } type Executor struct { - c *config.RunServiceExecutor + c *config.Executor runserviceClient *rsapi.Client id string runningTasks *runningTasks @@ -1236,7 +1236,7 @@ type Executor struct { dynamic bool } -func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { +func NewExecutor(c *config.Executor) (*Executor, error) { if c.Debug { level.SetLevel(zapcore.DebugLevel) } @@ -1256,7 +1256,7 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { e := &Executor{ c: c, - runserviceClient: rsapi.NewClient(c.RunServiceURL), + runserviceClient: rsapi.NewClient(c.RunserviceURL), runningTasks: &runningTasks{ tasks: make(map[string]*runningTask), }, diff --git a/internal/services/runservice/executor/registry/registry.go b/internal/services/executor/registry/registry.go similarity index 100% rename from internal/services/runservice/executor/registry/registry.go rename to internal/services/executor/registry/registry.go diff --git a/internal/services/gateway/action/action.go b/internal/services/gateway/action/action.go index 744af48..9967cb9 100644 --- a/internal/services/gateway/action/action.go +++ b/internal/services/gateway/action/action.go @@ -20,7 +20,7 @@ import ( "github.com/pkg/errors" csapi "github.com/sorintlab/agola/internal/services/configstore/api" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "go.uber.org/zap" diff --git a/internal/services/gateway/action/run.go b/internal/services/gateway/action/run.go index a439032..116975c 100644 --- a/internal/services/gateway/action/run.go +++ b/internal/services/gateway/action/run.go @@ -20,7 +20,7 @@ import ( "net/http" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "github.com/pkg/errors" diff --git a/internal/services/gateway/gateway.go b/internal/services/gateway/gateway.go index 75f63cd..efa3241 100644 --- a/internal/services/gateway/gateway.go +++ b/internal/services/gateway/gateway.go @@ -29,7 +29,7 @@ import ( "github.com/sorintlab/agola/internal/services/gateway/api" "github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/handlers" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" jwt "github.com/dgrijalva/jwt-go" @@ -122,7 +122,7 @@ func NewGateway(gc *config.Config) (*Gateway, error) { } configstoreClient := csapi.NewClient(c.ConfigstoreURL) - runserviceClient := rsapi.NewClient(c.RunServiceURL) + runserviceClient := rsapi.NewClient(c.RunserviceURL) ah := action.NewActionHandler(logger, sd, configstoreClient, runserviceClient, gc.ID, c.APIExposedURL, c.WebExposedURL) diff --git a/internal/services/gateway/webhook.go b/internal/services/gateway/webhook.go index bec1240..6e392e5 100644 --- a/internal/services/gateway/webhook.go +++ b/internal/services/gateway/webhook.go @@ -27,7 +27,7 @@ import ( csapi "github.com/sorintlab/agola/internal/services/configstore/api" "github.com/sorintlab/agola/internal/services/gateway/action" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" rstypes "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/action/action.go b/internal/services/runservice/action/action.go similarity index 98% rename from internal/services/runservice/scheduler/action/action.go rename to internal/services/runservice/action/action.go index 95749ba..466de78 100644 --- a/internal/services/runservice/scheduler/action/action.go +++ b/internal/services/runservice/action/action.go @@ -26,9 +26,9 @@ import ( "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/sequence" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/action/action_test.go b/internal/services/runservice/action/action_test.go similarity index 100% rename from internal/services/runservice/scheduler/action/action_test.go rename to internal/services/runservice/action/action_test.go diff --git a/internal/services/runservice/scheduler/api/api.go b/internal/services/runservice/api/api.go similarity index 98% rename from internal/services/runservice/scheduler/api/api.go rename to internal/services/runservice/api/api.go index 1fa2d60..951fd88 100644 --- a/internal/services/runservice/scheduler/api/api.go +++ b/internal/services/runservice/api/api.go @@ -27,10 +27,10 @@ import ( "github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/runservice/scheduler/api/client.go b/internal/services/runservice/api/client.go similarity index 100% rename from internal/services/runservice/scheduler/api/client.go rename to internal/services/runservice/api/client.go diff --git a/internal/services/runservice/scheduler/api/executor.go b/internal/services/runservice/api/executor.go similarity index 97% rename from internal/services/runservice/scheduler/api/executor.go rename to internal/services/runservice/api/executor.go index 8cecdec..bceb6ba 100644 --- a/internal/services/runservice/scheduler/api/executor.go +++ b/internal/services/runservice/api/executor.go @@ -26,9 +26,9 @@ import ( "github.com/gorilla/mux" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "go.uber.org/zap" ) diff --git a/internal/services/runservice/scheduler/common/common.go b/internal/services/runservice/common/common.go similarity index 100% rename from internal/services/runservice/scheduler/common/common.go rename to internal/services/runservice/common/common.go diff --git a/internal/services/runservice/scheduler/common/events.go b/internal/services/runservice/common/events.go similarity index 100% rename from internal/services/runservice/scheduler/common/events.go rename to internal/services/runservice/common/events.go diff --git a/internal/services/runservice/scheduler/readdb/create.go b/internal/services/runservice/readdb/create.go similarity index 100% rename from internal/services/runservice/scheduler/readdb/create.go rename to internal/services/runservice/readdb/create.go diff --git a/internal/services/runservice/scheduler/readdb/readdb.go b/internal/services/runservice/readdb/readdb.go similarity index 99% rename from internal/services/runservice/scheduler/readdb/readdb.go rename to internal/services/runservice/readdb/readdb.go index 1f7c6e3..1f2beda 100644 --- a/internal/services/runservice/scheduler/readdb/readdb.go +++ b/internal/services/runservice/readdb/readdb.go @@ -33,8 +33,8 @@ import ( "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/sequence" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" "go.uber.org/zap" diff --git a/internal/services/runservice/runservice.go b/internal/services/runservice/runservice.go new file mode 100644 index 0000000..4dbf53a --- /dev/null +++ b/internal/services/runservice/runservice.go @@ -0,0 +1,270 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package runservice + +import ( + "context" + "crypto/tls" + "net/http" + "path/filepath" + "time" + + scommon "github.com/sorintlab/agola/internal/common" + "github.com/sorintlab/agola/internal/datamanager" + "github.com/sorintlab/agola/internal/etcd" + "github.com/sorintlab/agola/internal/objectstorage" + "github.com/sorintlab/agola/internal/services/config" + "github.com/sorintlab/agola/internal/services/runservice/action" + "github.com/sorintlab/agola/internal/services/runservice/api" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/readdb" + "github.com/sorintlab/agola/internal/services/runservice/types" + "github.com/sorintlab/agola/internal/util" + + ghandlers "github.com/gorilla/handlers" + "github.com/gorilla/mux" + etcdclientv3 "go.etcd.io/etcd/clientv3" + "go.uber.org/zap/zapcore" +) + +// etcdPingerLoop periodically updates a key. +// This is used by watchers to inform the client of the current revision +// this is needed since if other users are updating other unwatched keys on +// etcd we won't be notified, not updating the known revisions +// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? +func (s *Runservice) etcdPingerLoop(ctx context.Context) { + for { + if err := s.etcdPinger(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (s *Runservice) etcdPinger(ctx context.Context) error { + if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil { + return err + } + return nil +} + +type Runservice struct { + c *config.Runservice + e *etcd.Store + ost *objectstorage.ObjStorage + dm *datamanager.DataManager + readDB *readdb.ReadDB + ah *action.ActionHandler +} + +func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) { + if c.Debug { + level.SetLevel(zapcore.DebugLevel) + } + + ost, err := scommon.NewObjectStorage(&c.ObjectStorage) + if err != nil { + return nil, err + } + e, err := scommon.NewEtcd(&c.Etcd, logger, "runservice") + if err != nil { + return nil, err + } + + s := &Runservice{ + c: c, + e: e, + ost: ost, + } + + dmConf := &datamanager.DataManagerConfig{ + E: e, + OST: ost, + DataTypes: []string{ + string(common.DataTypeRun), + string(common.DataTypeRunConfig), + string(common.DataTypeRunCounter), + }, + } + dm, err := datamanager.NewDataManager(ctx, logger, dmConf) + if err != nil { + return nil, err + } + s.dm = dm + + readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) + if err != nil { + return nil, err + } + s.readDB = readDB + + ah := action.NewActionHandler(logger, e, readDB, ost, dm) + s.ah = ah + + return s, nil +} + +func (s *Runservice) InitEtcd(ctx context.Context) error { + // Create changegroup min revision if it doesn't exists + cmp := []etcdclientv3.Cmp{} + then := []etcdclientv3.Op{} + + cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) + then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) + txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...) + if _, err := txn.Commit(); err != nil { + return etcd.FromEtcdError(err) + } + + return nil +} + +func (s *Runservice) Run(ctx context.Context) error { + errCh := make(chan error) + dmReadyCh := make(chan struct{}) + + go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() + + // wait for dm to be ready + <-dmReadyCh + + for { + err := s.InitEtcd(ctx) + if err == nil { + break + } + log.Errorf("failed to initialize etcd: %+v", err) + time.Sleep(1 * time.Second) + } + + go s.readDB.Run(ctx) + + ch := make(chan *types.ExecutorTask) + + // noop coors handler + corsHandler := func(h http.Handler) http.Handler { + return h + } + + corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"}) + corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"}) + corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"}) + corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) + + // executor dedicated api, only calls from executor should happen on these handlers + executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) + executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) + executorTaskHandler := api.NewExecutorTaskHandler(s.e) + executorTasksHandler := api.NewExecutorTasksHandler(s.e) + archivesHandler := api.NewArchivesHandler(logger, s.ost) + cacheHandler := api.NewCacheHandler(logger, s.ost) + cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) + + // api from clients + executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah) + + logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) + + runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) + runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah) + runsHandler := api.NewRunsHandler(logger, s.readDB) + runActionsHandler := api.NewRunActionsHandler(logger, s.ah) + runCreateHandler := api.NewRunCreateHandler(logger, s.ah) + changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) + + router := mux.NewRouter() + apirouter := router.PathPrefix("/api/v1alpha").Subrouter() + + // don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method + apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) + + apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") + apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE") + apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET") + apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") + apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") + apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD") + apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET") + apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST") + + apirouter.Handle("/logs", logsHandler).Methods("GET") + + apirouter.Handle("/runs/{runid}", runHandler).Methods("GET") + apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT") + apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") + apirouter.Handle("/runs", runsHandler).Methods("GET") + apirouter.Handle("/runs", runCreateHandler).Methods("POST") + + apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET") + + mainrouter := mux.NewRouter() + mainrouter.PathPrefix("/").Handler(corsHandler(router)) + + // Return a bad request when it doesn't match any route + mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) + + go s.executorTasksCleanerLoop(ctx) + go s.runsSchedulerLoop(ctx) + go s.runTasksUpdaterLoop(ctx) + go s.fetcherLoop(ctx) + go s.finishedRunsArchiverLoop(ctx) + go s.compactChangeGroupsLoop(ctx) + go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) + go s.executorTaskUpdateHandler(ctx, ch) + + go s.etcdPingerLoop(ctx) + + var tlsConfig *tls.Config + if s.c.Web.TLS { + var err error + tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false) + if err != nil { + log.Errorf("err: %+v") + return err + } + } + + httpServer := http.Server{ + Addr: s.c.Web.ListenAddress, + Handler: mainrouter, + TLSConfig: tlsConfig, + } + + lerrCh := make(chan error) + go func() { + lerrCh <- httpServer.ListenAndServe() + }() + + select { + case <-ctx.Done(): + log.Infof("runservice scheduler exiting") + httpServer.Close() + return nil + case err := <-lerrCh: + log.Errorf("http server listen error: %v", err) + return err + case err := <-errCh: + log.Errorf("error: %+v", err) + return err + } +} diff --git a/internal/services/runservice/scheduler/scheduler.go b/internal/services/runservice/scheduler.go similarity index 74% rename from internal/services/runservice/scheduler/scheduler.go rename to internal/services/runservice/scheduler.go index d75abf3..4d9480b 100644 --- a/internal/services/runservice/scheduler/scheduler.go +++ b/internal/services/runservice/scheduler.go @@ -12,38 +12,29 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package runservice import ( "bytes" "context" - "crypto/tls" "encoding/json" "fmt" "net/http" "os" - "path/filepath" "sort" "strconv" "time" - scommon "github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/runconfig" - "github.com/sorintlab/agola/internal/services/config" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/action" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/store" + "github.com/sorintlab/agola/internal/services/runservice/common" + "github.com/sorintlab/agola/internal/services/runservice/store" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" - ghandlers "github.com/gorilla/handlers" - "github.com/gorilla/mux" "github.com/pkg/errors" etcdclientv3 "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/clientv3/concurrency" @@ -65,7 +56,7 @@ func mergeEnv(dest, src map[string]string) { } } -func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { +func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { // the real source of active tasks is the number of executor tasks in etcd // we can't rely on RunTask.Status since it's only updated when receiveing // updated from the executor so it could be in a NotStarted state but have an @@ -84,7 +75,7 @@ func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([ return activeTasks, nil } -func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { +func (s *Runservice) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { activeTasks, err := s.runActiveExecutorTasks(ctx, runID) if err != nil { return false, err @@ -237,7 +228,7 @@ func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*t return tasksToRun, nil } -func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { +func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { log.Debugf("tasksToRun: %s", util.Dump(tasks)) for _, rt := range tasks { @@ -278,7 +269,7 @@ func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types. // chooseExecutor chooses the executor to schedule the task on. Now it's a very simple/dumb selection // TODO(sgotti) improve this to use executor statistic, labels (arch type) etc... -func (s *Scheduler) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) { +func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) { executors, err := store.GetExecutors(ctx, s.e) if err != nil { return nil, err @@ -318,7 +309,7 @@ func (p parentsByLevelName) Less(i, j int) bool { } func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { +func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { rct := rc.Tasks[rt.ID] environment := map[string]string{} @@ -382,7 +373,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types // sendExecutorTask sends executor task to executor, if this fails the executor // will periodically fetch the executortask anyway -func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -408,7 +399,7 @@ func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask return nil } -func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) { +func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) { for { if err := s.compactChangeGroups(ctx); err != nil { log.Errorf("err: %+v", err) @@ -424,7 +415,7 @@ func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) { } } -func (s *Scheduler) compactChangeGroups(ctx context.Context) error { +func (s *Runservice) compactChangeGroups(ctx context.Context) error { resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey) if err != nil { return err @@ -470,7 +461,7 @@ func (s *Scheduler) compactChangeGroups(ctx context.Context) error { return nil } -func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { +func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { log.Debugf("r: %s", util.Dump(r)) prevPhase := r.Phase @@ -617,7 +608,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx return nil } -func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { r, _, err := store.GetRun(ctx, s.e, et.RunID) if err != nil { return err @@ -638,7 +629,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec return s.scheduleRun(ctx, r, rc) } -func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { +func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { log.Debugf("et: %s", util.Dump(et)) rt, ok := r.Tasks[et.ID] @@ -715,7 +706,7 @@ func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorT return nil } -func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { +func (s *Runservice) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { for { select { case <-ctx.Done(): @@ -732,7 +723,7 @@ func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *typ } } -func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) { +func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) { for { log.Debugf("executorTasksCleaner") @@ -750,7 +741,7 @@ func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) { } } -func (s *Scheduler) executorTasksCleaner(ctx context.Context) error { +func (s *Runservice) executorTasksCleaner(ctx context.Context) error { resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) if err != nil { return err @@ -771,7 +762,7 @@ func (s *Scheduler) executorTasksCleaner(ctx context.Context) error { return nil } -func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { +func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { log.Debugf("et: %s", util.Dump(et)) if et.Status.Phase.IsFinished() { r, _, err := store.GetRun(ctx, s.e, et.RunID) @@ -820,7 +811,7 @@ func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorT return nil } -func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) { +func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) { for { log.Debugf("runTasksUpdater") @@ -832,7 +823,7 @@ func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) { } } -func (s *Scheduler) runTasksUpdater(ctx context.Context) error { +func (s *Runservice) runTasksUpdater(ctx context.Context) error { log.Debugf("runTasksUpdater") session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) @@ -870,7 +861,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error { return nil } -func (s *Scheduler) fileExists(path string) (bool, error) { +func (s *Runservice) fileExists(path string) (bool, error) { _, err := os.Stat(path) if err != nil && !os.IsNotExist(err) { return false, err @@ -878,7 +869,7 @@ func (s *Scheduler) fileExists(path string) (bool, error) { return !os.IsNotExist(err), nil } -func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { +func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { et, err := store.GetExecutorTask(ctx, s.e, rt.ID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -944,7 +935,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, return s.ost.WriteObject(logPath, r.Body, size, false) } -func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { +func (s *Runservice) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -961,7 +952,7 @@ func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID st return nil } -func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { +func (s *Runservice) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -981,7 +972,7 @@ func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID str return nil } -func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error { +func (s *Runservice) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error { r, _, err := store.GetRun(ctx, s.e, runID) if err != nil { return err @@ -1011,7 +1002,7 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str return nil } -func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { +func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskLogs") // fetch setup log @@ -1040,7 +1031,7 @@ func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.R } } -func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error { +func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error { et, err := store.GetExecutorTask(ctx, s.e, rt.ID) if err != nil && err != etcd.ErrKeyNotFound { return err @@ -1095,7 +1086,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum return s.ost.WriteObject(path, r.Body, size, false) } -func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { +func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { log.Debugf("fetchTaskArchives") for i, stepnum := range rt.WorkspaceArchives { phase := rt.WorkspaceArchivesPhase[i] @@ -1112,7 +1103,7 @@ func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *typ } } -func (s *Scheduler) fetcherLoop(ctx context.Context) { +func (s *Runservice) fetcherLoop(ctx context.Context) { for { log.Debugf("fetcher") @@ -1130,7 +1121,7 @@ func (s *Scheduler) fetcherLoop(ctx context.Context) { } } -func (s *Scheduler) fetcher(ctx context.Context) error { +func (s *Runservice) fetcher(ctx context.Context) error { log.Debugf("fetcher") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1155,7 +1146,7 @@ func (s *Scheduler) fetcher(ctx context.Context) error { } -func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { +func (s *Runservice) runsSchedulerLoop(ctx context.Context) { for { log.Debugf("runsSchedulerLoop") @@ -1173,7 +1164,7 @@ func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { } } -func (s *Scheduler) runsScheduler(ctx context.Context) error { +func (s *Runservice) runsScheduler(ctx context.Context) error { log.Debugf("runsScheduler") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1188,7 +1179,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error { return nil } -func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { +func (s *Runservice) runScheduler(ctx context.Context, r *types.Run) error { log.Debugf("runScheduler") rc, err := store.OSTGetRunConfig(s.dm, r.ID) if err != nil { @@ -1198,7 +1189,7 @@ func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { return s.scheduleRun(ctx, r, rc) } -func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { +func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) { for { log.Debugf("finished run archiver loop") @@ -1216,7 +1207,7 @@ func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { } } -func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { +func (s *Runservice) finishedRunsArchiver(ctx context.Context) error { log.Debugf("finished run archiver") runs, err := store.GetRuns(ctx, s.e) if err != nil { @@ -1246,7 +1237,7 @@ func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { // finishedRunArchiver archives a run if it's finished and all the fetching // phases (logs and archives) are marked as finished -func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error { +func (s *Runservice) finishedRunArchiver(ctx context.Context, r *types.Run) error { //log.Debugf("r: %s", util.Dump(r)) if !r.Phase.IsFinished() { return nil @@ -1289,7 +1280,7 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error return nil } -func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { +func (s *Runservice) runOSTArchiver(ctx context.Context, r *types.Run) error { // TODO(sgotti) avoid saving multiple times the run on objectstorage if the // store.DeletedArchivedRun fails log.Infof("saving run in objectstorage: %s", r.ID) @@ -1312,7 +1303,7 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { return nil } -func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { +func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { for { if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil { log.Errorf("err: %+v", err) @@ -1328,7 +1319,7 @@ func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval ti } } -func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { +func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { log.Debugf("cacheCleaner") session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) @@ -1364,233 +1355,3 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D return nil } - -// etcdPingerLoop periodically updates a key. -// This is used by watchers to inform the client of the current revision -// this is needed since if other users are updating other unwatched keys on -// etcd we won't be notified, not updating the known revisions -// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress??? -func (s *Scheduler) etcdPingerLoop(ctx context.Context) { - for { - if err := s.etcdPinger(ctx); err != nil { - log.Errorf("err: %+v", err) - } - - select { - case <-ctx.Done(): - return - default: - } - - time.Sleep(1 * time.Second) - } -} - -func (s *Scheduler) etcdPinger(ctx context.Context) error { - if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil { - return err - } - return nil -} - -type Scheduler struct { - c *config.RunServiceScheduler - e *etcd.Store - ost *objectstorage.ObjStorage - dm *datamanager.DataManager - readDB *readdb.ReadDB - ah *action.ActionHandler -} - -func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Scheduler, error) { - if c.Debug { - level.SetLevel(zapcore.DebugLevel) - } - - ost, err := scommon.NewObjectStorage(&c.ObjectStorage) - if err != nil { - return nil, err - } - e, err := scommon.NewEtcd(&c.Etcd, logger, "runscheduler") - if err != nil { - return nil, err - } - - s := &Scheduler{ - c: c, - e: e, - ost: ost, - } - - dmConf := &datamanager.DataManagerConfig{ - E: e, - OST: ost, - DataTypes: []string{ - string(common.DataTypeRun), - string(common.DataTypeRunConfig), - string(common.DataTypeRunCounter), - }, - } - dm, err := datamanager.NewDataManager(ctx, logger, dmConf) - if err != nil { - return nil, err - } - s.dm = dm - - readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm) - if err != nil { - return nil, err - } - s.readDB = readDB - - ah := action.NewActionHandler(logger, e, readDB, ost, dm) - s.ah = ah - - return s, nil -} - -func (s *Scheduler) InitEtcd(ctx context.Context) error { - // Create changegroup min revision if it doesn't exists - cmp := []etcdclientv3.Cmp{} - then := []etcdclientv3.Op{} - - cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0)) - then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, "")) - txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...) - if _, err := txn.Commit(); err != nil { - return etcd.FromEtcdError(err) - } - - return nil -} - -func (s *Scheduler) Run(ctx context.Context) error { - errCh := make(chan error) - dmReadyCh := make(chan struct{}) - - go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }() - - // wait for dm to be ready - <-dmReadyCh - - for { - err := s.InitEtcd(ctx) - if err == nil { - break - } - log.Errorf("failed to initialize etcd: %+v", err) - time.Sleep(1 * time.Second) - } - - go s.readDB.Run(ctx) - - ch := make(chan *types.ExecutorTask) - - // noop coors handler - corsHandler := func(h http.Handler) http.Handler { - return h - } - - corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"}) - corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"}) - corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"}) - corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions) - - // executor dedicated api, only calls from executor should happen on these handlers - executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah) - executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch) - executorTaskHandler := api.NewExecutorTaskHandler(s.e) - executorTasksHandler := api.NewExecutorTasksHandler(s.e) - archivesHandler := api.NewArchivesHandler(logger, s.ost) - cacheHandler := api.NewCacheHandler(logger, s.ost) - cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost) - - // api from clients - executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah) - - logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm) - - runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB) - runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah) - runsHandler := api.NewRunsHandler(logger, s.readDB) - runActionsHandler := api.NewRunActionsHandler(logger, s.ah) - runCreateHandler := api.NewRunCreateHandler(logger, s.ah) - changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB) - - router := mux.NewRouter() - apirouter := router.PathPrefix("/api/v1alpha").Subrouter() - - // don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method - apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) - - apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST") - apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE") - apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET") - apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET") - apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST") - apirouter.Handle("/executor/archives", archivesHandler).Methods("GET") - apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD") - apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET") - apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST") - - apirouter.Handle("/logs", logsHandler).Methods("GET") - - apirouter.Handle("/runs/{runid}", runHandler).Methods("GET") - apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT") - apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT") - apirouter.Handle("/runs", runsHandler).Methods("GET") - apirouter.Handle("/runs", runCreateHandler).Methods("POST") - - apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET") - - mainrouter := mux.NewRouter() - mainrouter.PathPrefix("/").Handler(corsHandler(router)) - - // Return a bad request when it doesn't match any route - mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) }) - - go s.executorTasksCleanerLoop(ctx) - go s.runsSchedulerLoop(ctx) - go s.runTasksUpdaterLoop(ctx) - go s.fetcherLoop(ctx) - go s.finishedRunsArchiverLoop(ctx) - go s.compactChangeGroupsLoop(ctx) - go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval) - go s.executorTaskUpdateHandler(ctx, ch) - - go s.etcdPingerLoop(ctx) - - var tlsConfig *tls.Config - if s.c.Web.TLS { - var err error - tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false) - if err != nil { - log.Errorf("err: %+v") - return err - } - } - - httpServer := http.Server{ - Addr: s.c.Web.ListenAddress, - Handler: mainrouter, - TLSConfig: tlsConfig, - } - - lerrCh := make(chan error) - go func() { - lerrCh <- httpServer.ListenAndServe() - }() - - select { - case <-ctx.Done(): - log.Infof("runservice scheduler exiting") - httpServer.Close() - return nil - case err := <-lerrCh: - log.Errorf("http server listen error: %v", err) - return err - case err := <-errCh: - log.Errorf("error: %+v", err) - return err - } -} diff --git a/internal/services/runservice/scheduler/scheduler_test.go b/internal/services/runservice/scheduler_test.go similarity index 99% rename from internal/services/runservice/scheduler/scheduler_test.go rename to internal/services/runservice/scheduler_test.go index cf6e7d0..e69f1a2 100644 --- a/internal/services/runservice/scheduler/scheduler_test.go +++ b/internal/services/runservice/scheduler_test.go @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package scheduler +package runservice import ( "context" diff --git a/internal/services/runservice/scheduler/store/store.go b/internal/services/runservice/store/store.go similarity index 99% rename from internal/services/runservice/scheduler/store/store.go rename to internal/services/runservice/store/store.go index 3778ea2..65b1bd3 100644 --- a/internal/services/runservice/scheduler/store/store.go +++ b/internal/services/runservice/store/store.go @@ -25,7 +25,7 @@ import ( "github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/objectstorage" - "github.com/sorintlab/agola/internal/services/runservice/scheduler/common" + "github.com/sorintlab/agola/internal/services/runservice/common" "github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/util" diff --git a/internal/services/scheduler/scheduler.go b/internal/services/scheduler/scheduler.go index 8334ea1..9e1d17c 100644 --- a/internal/services/scheduler/scheduler.go +++ b/internal/services/scheduler/scheduler.go @@ -23,7 +23,7 @@ import ( slog "github.com/sorintlab/agola/internal/log" "github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/gateway/common" - rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" "github.com/sorintlab/agola/internal/util" "github.com/pkg/errors" @@ -194,7 +194,7 @@ func NewScheduler(c *config.Scheduler) (*Scheduler, error) { } return &Scheduler{ - runserviceClient: rsapi.NewClient(c.RunServiceURL), + runserviceClient: rsapi.NewClient(c.RunserviceURL), }, nil }