runservice: split and simplify scheduler and executor naming

Also if they are logically part of the runservice the names runserviceExecutor
and runserviceScheduler are long and quite confusing for an external user

Simplify them separating both the code parts and updating the names:

runserviceScheduler -> runservice
runserviceExecutor -> executor
This commit is contained in:
Simone Gotti 2019-05-07 23:56:10 +02:00
parent 44d5b0f25a
commit 1e34dca95d
30 changed files with 388 additions and 357 deletions

View File

@ -21,11 +21,11 @@ import (
"github.com/sorintlab/agola/cmd" "github.com/sorintlab/agola/cmd"
"github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/configstore" "github.com/sorintlab/agola/internal/services/configstore"
"github.com/sorintlab/agola/internal/services/executor"
rsexecutor "github.com/sorintlab/agola/internal/services/executor"
"github.com/sorintlab/agola/internal/services/gateway" "github.com/sorintlab/agola/internal/services/gateway"
"github.com/sorintlab/agola/internal/services/gitserver" "github.com/sorintlab/agola/internal/services/gitserver"
"github.com/sorintlab/agola/internal/services/runservice/executor" rsscheduler "github.com/sorintlab/agola/internal/services/runservice"
rsexecutor "github.com/sorintlab/agola/internal/services/runservice/executor"
rsscheduler "github.com/sorintlab/agola/internal/services/runservice/scheduler"
"github.com/sorintlab/agola/internal/services/scheduler" "github.com/sorintlab/agola/internal/services/scheduler"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
@ -139,17 +139,17 @@ func serve(cmd *cobra.Command, args []string) error {
} }
} }
var rssched *rsscheduler.Scheduler var rs *rsscheduler.Runservice
if isComponentEnabled("runservicescheduler") { if isComponentEnabled("runservicescheduler") {
rssched, err = rsscheduler.NewScheduler(ctx, &c.RunServiceScheduler) rs, err = rsscheduler.NewRunservice(ctx, &c.Runservice)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to start run service scheduler") return errors.Wrapf(err, "failed to start run service scheduler")
} }
} }
var rsex *rsexecutor.Executor var ex *rsexecutor.Executor
if isComponentEnabled("runserviceexecutor") { if isComponentEnabled("runserviceexecutor") {
rsex, err = executor.NewExecutor(&c.RunServiceExecutor) ex, err = executor.NewExecutor(&c.Executor)
if err != nil { if err != nil {
return errors.Wrapf(err, "failed to start run service executor") return errors.Wrapf(err, "failed to start run service executor")
} }
@ -189,11 +189,11 @@ func serve(cmd *cobra.Command, args []string) error {
errCh := make(chan error) errCh := make(chan error)
if rssched != nil { if rs != nil {
go func() { errCh <- rssched.Run(ctx) }() go func() { errCh <- rs.Run(ctx) }()
} }
if rsex != nil { if ex != nil {
go func() { errCh <- rsex.Run(ctx) }() go func() { errCh <- ex.Run(ctx) }()
} }
if cs != nil { if cs != nil {
go func() { errCh <- cs.Run(ctx) }() go func() { errCh <- cs.Run(ctx) }()

View File

@ -1,7 +1,7 @@
gateway: gateway:
apiExposedURL: "http://172.17.0.1:8000" apiExposedURL: "http://172.17.0.1:8000"
webExposedURL: "http://172.17.0.1:8080" webExposedURL: "http://172.17.0.1:8080"
runServiceURL: "http://localhost:4000" runserviceURL: "http://localhost:4000"
configstoreURL: "http://localhost:4002" configstoreURL: "http://localhost:4002"
gitServerURL: "http://172.17.0.1:4003" gitServerURL: "http://172.17.0.1:4003"
@ -18,7 +18,7 @@ gateway:
adminToken: "admintoken" adminToken: "admintoken"
scheduler: scheduler:
runServiceURL: "http://localhost:4000" runserviceURL: "http://localhost:4000"
configstore: configstore:
dataDir: /tmp/agola/configstore dataDir: /tmp/agola/configstore
@ -30,9 +30,9 @@ configstore:
web: web:
listenAddress: ":4002" listenAddress: ":4002"
runServiceScheduler: runservice:
#debug: true #debug: true
dataDir: /tmp/agola/runservice/scheduler dataDir: /tmp/agola/runservice
objectStorage: objectStorage:
type: posix type: posix
path: /tmp/agola/runservice/ost path: /tmp/agola/runservice/ost
@ -41,10 +41,10 @@ runServiceScheduler:
web: web:
listenAddress: ":4000" listenAddress: ":4000"
runServiceExecutor: executor:
dataDir: /tmp/agola/runservice/executor dataDir: /tmp/agola/executor
toolboxPath: ./bin/agola-toolbox toolboxPath: ./bin/agola-toolbox
runServiceURL: "http://localhost:4000" runserviceURL: "http://localhost:4000"
web: web:
listenAddress: ":4001" listenAddress: ":4001"
activeTasksLimit: 2 activeTasksLimit: 2

View File

@ -33,12 +33,12 @@ type Config struct {
// Defaults to "agola" // Defaults to "agola"
ID string `yaml:"id"` ID string `yaml:"id"`
Gateway Gateway `yaml:"gateway"` Gateway Gateway `yaml:"gateway"`
Scheduler Scheduler `yaml:"scheduler"` Scheduler Scheduler `yaml:"scheduler"`
RunServiceScheduler RunServiceScheduler `yaml:"runServiceScheduler"` Runservice Runservice `yaml:"runservice"`
RunServiceExecutor RunServiceExecutor `yaml:"runServiceExecutor"` Executor Executor `yaml:"executor"`
Configstore Configstore `yaml:"configstore"` Configstore Configstore `yaml:"configstore"`
GitServer GitServer `yaml:"gitServer"` GitServer GitServer `yaml:"gitServer"`
} }
type Gateway struct { type Gateway struct {
@ -51,7 +51,7 @@ type Gateway struct {
// This is used for generating the redirect_url in oauth2 redirects // This is used for generating the redirect_url in oauth2 redirects
WebExposedURL string `yaml:"webExposedURL"` WebExposedURL string `yaml:"webExposedURL"`
RunServiceURL string `yaml:"runServiceURL"` RunserviceURL string `yaml:"runserviceURL"`
ConfigstoreURL string `yaml:"configstoreURL"` ConfigstoreURL string `yaml:"configstoreURL"`
GitServerURL string `yaml:"gitServerURL"` GitServerURL string `yaml:"gitServerURL"`
@ -67,10 +67,10 @@ type Gateway struct {
type Scheduler struct { type Scheduler struct {
Debug bool `yaml:"debug"` Debug bool `yaml:"debug"`
RunServiceURL string `yaml:"runServiceURL"` RunserviceURL string `yaml:"runserviceURL"`
} }
type RunServiceScheduler struct { type Runservice struct {
Debug bool `yaml:"debug"` Debug bool `yaml:"debug"`
DataDir string `yaml:"dataDir"` DataDir string `yaml:"dataDir"`
@ -81,12 +81,12 @@ type RunServiceScheduler struct {
RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"` RunCacheExpireInterval time.Duration `yaml:"runCacheExpireInterval"`
} }
type RunServiceExecutor struct { type Executor struct {
Debug bool `yaml:"debug"` Debug bool `yaml:"debug"`
DataDir string `yaml:"dataDir"` DataDir string `yaml:"dataDir"`
RunServiceURL string `yaml:"runServiceURL"` RunserviceURL string `yaml:"runserviceURL"`
ToolboxPath string `yaml:"toolboxPath"` ToolboxPath string `yaml:"toolboxPath"`
Web Web `yaml:"web"` Web Web `yaml:"web"`
@ -208,10 +208,10 @@ var defaultConfig = Config{
Duration: 12 * time.Hour, Duration: 12 * time.Hour,
}, },
}, },
RunServiceScheduler: RunServiceScheduler{ Runservice: Runservice{
RunCacheExpireInterval: 7 * 24 * time.Hour, RunCacheExpireInterval: 7 * 24 * time.Hour,
}, },
RunServiceExecutor: RunServiceExecutor{ Executor: Executor{
ActiveTasksLimit: 2, ActiveTasksLimit: 2,
}, },
} }
@ -266,8 +266,8 @@ func Validate(c *Config) error {
if c.Gateway.ConfigstoreURL == "" { if c.Gateway.ConfigstoreURL == "" {
return errors.Errorf("gateway configstoreURL is empty") return errors.Errorf("gateway configstoreURL is empty")
} }
if c.Gateway.RunServiceURL == "" { if c.Gateway.RunserviceURL == "" {
return errors.Errorf("gateway runServiceURL is empty") return errors.Errorf("gateway runserviceURL is empty")
} }
if err := validateWeb(&c.Gateway.Web); err != nil { if err := validateWeb(&c.Gateway.Web); err != nil {
return errors.Wrapf(err, "gateway web configuration error") return errors.Wrapf(err, "gateway web configuration error")
@ -281,37 +281,37 @@ func Validate(c *Config) error {
return errors.Wrapf(err, "configstore web configuration error") return errors.Wrapf(err, "configstore web configuration error")
} }
// Runservice Scheduler // Runservice
if c.RunServiceScheduler.DataDir == "" { if c.Runservice.DataDir == "" {
return errors.Errorf("runservice scheduler dataDir is empty") return errors.Errorf("runservice dataDir is empty")
} }
if err := validateWeb(&c.RunServiceScheduler.Web); err != nil { if err := validateWeb(&c.Runservice.Web); err != nil {
return errors.Wrapf(err, "runservice scheduler web configuration error") return errors.Wrapf(err, "runservice web configuration error")
} }
// Runservice Executor // Executor
if c.RunServiceExecutor.DataDir == "" { if c.Executor.DataDir == "" {
return errors.Errorf("runservice executor dataDir is empty") return errors.Errorf("executor dataDir is empty")
} }
if c.RunServiceExecutor.ToolboxPath == "" { if c.Executor.ToolboxPath == "" {
return errors.Errorf("git server toolboxPath is empty") return errors.Errorf("git server toolboxPath is empty")
} }
if c.RunServiceExecutor.RunServiceURL == "" { if c.Executor.RunserviceURL == "" {
return errors.Errorf("runservice executor runServiceURL is empty") return errors.Errorf("executor runserviceURL is empty")
} }
if c.RunServiceExecutor.Driver.Type == "" { if c.Executor.Driver.Type == "" {
return errors.Errorf("runservice executor driver type is empty") return errors.Errorf("executor driver type is empty")
} }
switch c.RunServiceExecutor.Driver.Type { switch c.Executor.Driver.Type {
case DriverTypeDocker: case DriverTypeDocker:
case DriverTypeK8s: case DriverTypeK8s:
default: default:
return errors.Errorf("runservice executor driver type %q unknown", c.RunServiceExecutor.Driver.Type) return errors.Errorf("executor driver type %q unknown", c.Executor.Driver.Type)
} }
// Scheduler // Scheduler
if c.Scheduler.RunServiceURL == "" { if c.Scheduler.RunserviceURL == "" {
return errors.Errorf("scheduler runServiceURL is empty") return errors.Errorf("scheduler runserviceURL is empty")
} }
// Git server // Git server

View File

@ -29,7 +29,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/services/runservice/executor/registry" "github.com/sorintlab/agola/internal/services/executor/registry"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/container"

View File

@ -19,7 +19,7 @@ import (
"io" "io"
"github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/services/runservice/executor/registry" "github.com/sorintlab/agola/internal/services/executor/registry"
) )
const ( const (

View File

@ -35,9 +35,9 @@ import (
"github.com/sorintlab/agola/internal/common" "github.com/sorintlab/agola/internal/common"
slog "github.com/sorintlab/agola/internal/log" slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/runservice/executor/driver" "github.com/sorintlab/agola/internal/services/executor/driver"
"github.com/sorintlab/agola/internal/services/runservice/executor/registry" "github.com/sorintlab/agola/internal/services/executor/registry"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
@ -1227,7 +1227,7 @@ func (e *Executor) saveExecutorID(id string) error {
} }
type Executor struct { type Executor struct {
c *config.RunServiceExecutor c *config.Executor
runserviceClient *rsapi.Client runserviceClient *rsapi.Client
id string id string
runningTasks *runningTasks runningTasks *runningTasks
@ -1236,7 +1236,7 @@ type Executor struct {
dynamic bool dynamic bool
} }
func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { func NewExecutor(c *config.Executor) (*Executor, error) {
if c.Debug { if c.Debug {
level.SetLevel(zapcore.DebugLevel) level.SetLevel(zapcore.DebugLevel)
} }
@ -1256,7 +1256,7 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) {
e := &Executor{ e := &Executor{
c: c, c: c,
runserviceClient: rsapi.NewClient(c.RunServiceURL), runserviceClient: rsapi.NewClient(c.RunserviceURL),
runningTasks: &runningTasks{ runningTasks: &runningTasks{
tasks: make(map[string]*runningTask), tasks: make(map[string]*runningTask),
}, },

View File

@ -20,7 +20,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
csapi "github.com/sorintlab/agola/internal/services/configstore/api" csapi "github.com/sorintlab/agola/internal/services/configstore/api"
"github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/common"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
"go.uber.org/zap" "go.uber.org/zap"

View File

@ -20,7 +20,7 @@ import (
"net/http" "net/http"
"github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/common"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
"github.com/pkg/errors" "github.com/pkg/errors"

View File

@ -29,7 +29,7 @@ import (
"github.com/sorintlab/agola/internal/services/gateway/api" "github.com/sorintlab/agola/internal/services/gateway/api"
"github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/common"
"github.com/sorintlab/agola/internal/services/gateway/handlers" "github.com/sorintlab/agola/internal/services/gateway/handlers"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
jwt "github.com/dgrijalva/jwt-go" jwt "github.com/dgrijalva/jwt-go"
@ -122,7 +122,7 @@ func NewGateway(gc *config.Config) (*Gateway, error) {
} }
configstoreClient := csapi.NewClient(c.ConfigstoreURL) configstoreClient := csapi.NewClient(c.ConfigstoreURL)
runserviceClient := rsapi.NewClient(c.RunServiceURL) runserviceClient := rsapi.NewClient(c.RunserviceURL)
ah := action.NewActionHandler(logger, sd, configstoreClient, runserviceClient, gc.ID, c.APIExposedURL, c.WebExposedURL) ah := action.NewActionHandler(logger, sd, configstoreClient, runserviceClient, gc.ID, c.APIExposedURL, c.WebExposedURL)

View File

@ -27,7 +27,7 @@ import (
csapi "github.com/sorintlab/agola/internal/services/configstore/api" csapi "github.com/sorintlab/agola/internal/services/configstore/api"
"github.com/sorintlab/agola/internal/services/gateway/action" "github.com/sorintlab/agola/internal/services/gateway/action"
"github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/common"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
rstypes "github.com/sorintlab/agola/internal/services/runservice/types" rstypes "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/services/types" "github.com/sorintlab/agola/internal/services/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"

View File

@ -26,9 +26,9 @@ import (
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/runconfig"
"github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/sequence"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" "github.com/sorintlab/agola/internal/services/runservice/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/store"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"

View File

@ -27,10 +27,10 @@ import (
"github.com/sorintlab/agola/internal/db" "github.com/sorintlab/agola/internal/db"
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb" "github.com/sorintlab/agola/internal/services/runservice/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/store"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"

View File

@ -26,9 +26,9 @@ import (
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/action"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/store"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"go.uber.org/zap" "go.uber.org/zap"
) )

View File

@ -33,8 +33,8 @@ import (
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/sequence" "github.com/sorintlab/agola/internal/sequence"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store" "github.com/sorintlab/agola/internal/services/runservice/store"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
"go.uber.org/zap" "go.uber.org/zap"

View File

@ -0,0 +1,270 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package runservice
import (
"context"
"crypto/tls"
"net/http"
"path/filepath"
"time"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/runservice/action"
"github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/readdb"
"github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util"
ghandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
etcdclientv3 "go.etcd.io/etcd/clientv3"
"go.uber.org/zap/zapcore"
)
// etcdPingerLoop periodically updates a key.
// This is used by watchers to inform the client of the current revision
// this is needed since if other users are updating other unwatched keys on
// etcd we won't be notified, not updating the known revisions
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
func (s *Runservice) etcdPingerLoop(ctx context.Context) {
for {
if err := s.etcdPinger(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(1 * time.Second)
}
}
func (s *Runservice) etcdPinger(ctx context.Context) error {
if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil {
return err
}
return nil
}
type Runservice struct {
c *config.Runservice
e *etcd.Store
ost *objectstorage.ObjStorage
dm *datamanager.DataManager
readDB *readdb.ReadDB
ah *action.ActionHandler
}
func NewRunservice(ctx context.Context, c *config.Runservice) (*Runservice, error) {
if c.Debug {
level.SetLevel(zapcore.DebugLevel)
}
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
if err != nil {
return nil, err
}
e, err := scommon.NewEtcd(&c.Etcd, logger, "runservice")
if err != nil {
return nil, err
}
s := &Runservice{
c: c,
e: e,
ost: ost,
}
dmConf := &datamanager.DataManagerConfig{
E: e,
OST: ost,
DataTypes: []string{
string(common.DataTypeRun),
string(common.DataTypeRunConfig),
string(common.DataTypeRunCounter),
},
}
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
if err != nil {
return nil, err
}
s.dm = dm
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
if err != nil {
return nil, err
}
s.readDB = readDB
ah := action.NewActionHandler(logger, e, readDB, ost, dm)
s.ah = ah
return s, nil
}
func (s *Runservice) InitEtcd(ctx context.Context) error {
// Create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return etcd.FromEtcdError(err)
}
return nil
}
func (s *Runservice) Run(ctx context.Context) error {
errCh := make(chan error)
dmReadyCh := make(chan struct{})
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
// wait for dm to be ready
<-dmReadyCh
for {
err := s.InitEtcd(ctx)
if err == nil {
break
}
log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
go s.readDB.Run(ctx)
ch := make(chan *types.ExecutorTask)
// noop coors handler
corsHandler := func(h http.Handler) http.Handler {
return h
}
corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"})
corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"})
corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"})
corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
archivesHandler := api.NewArchivesHandler(logger, s.ost)
cacheHandler := api.NewCacheHandler(logger, s.ost)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)
// api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah)
runsHandler := api.NewRunsHandler(logger, s.readDB)
runActionsHandler := api.NewRunActionsHandler(logger, s.ah)
runCreateHandler := api.NewRunCreateHandler(logger, s.ah)
changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB)
router := mux.NewRouter()
apirouter := router.PathPrefix("/api/v1alpha").Subrouter()
// don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method
apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST")
apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE")
apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST")
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST")
apirouter.Handle("/logs", logsHandler).Methods("GET")
apirouter.Handle("/runs/{runid}", runHandler).Methods("GET")
apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT")
apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT")
apirouter.Handle("/runs", runsHandler).Methods("GET")
apirouter.Handle("/runs", runCreateHandler).Methods("POST")
apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET")
mainrouter := mux.NewRouter()
mainrouter.PathPrefix("/").Handler(corsHandler(router))
// Return a bad request when it doesn't match any route
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
go s.executorTasksCleanerLoop(ctx)
go s.runsSchedulerLoop(ctx)
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
go s.executorTaskUpdateHandler(ctx, ch)
go s.etcdPingerLoop(ctx)
var tlsConfig *tls.Config
if s.c.Web.TLS {
var err error
tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false)
if err != nil {
log.Errorf("err: %+v")
return err
}
}
httpServer := http.Server{
Addr: s.c.Web.ListenAddress,
Handler: mainrouter,
TLSConfig: tlsConfig,
}
lerrCh := make(chan error)
go func() {
lerrCh <- httpServer.ListenAndServe()
}()
select {
case <-ctx.Done():
log.Infof("runservice scheduler exiting")
httpServer.Close()
return nil
case err := <-lerrCh:
log.Errorf("http server listen error: %v", err)
return err
case err := <-errCh:
log.Errorf("error: %+v", err)
return err
}
}

View File

@ -12,38 +12,29 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package scheduler package runservice
import ( import (
"bytes" "bytes"
"context" "context"
"crypto/tls"
"encoding/json" "encoding/json"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
"path/filepath"
"sort" "sort"
"strconv" "strconv"
"time" "time"
scommon "github.com/sorintlab/agola/internal/common"
"github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
slog "github.com/sorintlab/agola/internal/log" slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/runconfig" "github.com/sorintlab/agola/internal/runconfig"
"github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/action" "github.com/sorintlab/agola/internal/services/runservice/store"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/api"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/readdb"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/store"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
ghandlers "github.com/gorilla/handlers"
"github.com/gorilla/mux"
"github.com/pkg/errors" "github.com/pkg/errors"
etcdclientv3 "go.etcd.io/etcd/clientv3" etcdclientv3 "go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/clientv3/concurrency" "go.etcd.io/etcd/clientv3/concurrency"
@ -65,7 +56,7 @@ func mergeEnv(dest, src map[string]string) {
} }
} }
func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) { func (s *Runservice) runActiveExecutorTasks(ctx context.Context, runID string) ([]*types.ExecutorTask, error) {
// the real source of active tasks is the number of executor tasks in etcd // the real source of active tasks is the number of executor tasks in etcd
// we can't rely on RunTask.Status since it's only updated when receiveing // we can't rely on RunTask.Status since it's only updated when receiveing
// updated from the executor so it could be in a NotStarted state but have an // updated from the executor so it could be in a NotStarted state but have an
@ -84,7 +75,7 @@ func (s *Scheduler) runActiveExecutorTasks(ctx context.Context, runID string) ([
return activeTasks, nil return activeTasks, nil
} }
func (s *Scheduler) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) { func (s *Runservice) runHasActiveExecutorTasks(ctx context.Context, runID string) (bool, error) {
activeTasks, err := s.runActiveExecutorTasks(ctx, runID) activeTasks, err := s.runActiveExecutorTasks(ctx, runID)
if err != nil { if err != nil {
return false, err return false, err
@ -237,7 +228,7 @@ func getTasksToRun(ctx context.Context, r *types.Run, rc *types.RunConfig) ([]*t
return tasksToRun, nil return tasksToRun, nil
} }
func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error { func (s *Runservice) submitRunTasks(ctx context.Context, r *types.Run, rc *types.RunConfig, tasks []*types.RunTask) error {
log.Debugf("tasksToRun: %s", util.Dump(tasks)) log.Debugf("tasksToRun: %s", util.Dump(tasks))
for _, rt := range tasks { for _, rt := range tasks {
@ -278,7 +269,7 @@ func (s *Scheduler) submitRunTasks(ctx context.Context, r *types.Run, rc *types.
// chooseExecutor chooses the executor to schedule the task on. Now it's a very simple/dumb selection // chooseExecutor chooses the executor to schedule the task on. Now it's a very simple/dumb selection
// TODO(sgotti) improve this to use executor statistic, labels (arch type) etc... // TODO(sgotti) improve this to use executor statistic, labels (arch type) etc...
func (s *Scheduler) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) { func (s *Runservice) chooseExecutor(ctx context.Context, rct *types.RunConfigTask) (*types.Executor, error) {
executors, err := store.GetExecutors(ctx, s.e) executors, err := store.GetExecutors(ctx, s.e)
if err != nil { if err != nil {
return nil, err return nil, err
@ -318,7 +309,7 @@ func (p parentsByLevelName) Less(i, j int) bool {
} }
func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p parentsByLevelName) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask { func (s *Runservice) genExecutorTask(ctx context.Context, r *types.Run, rt *types.RunTask, rc *types.RunConfig, executor *types.Executor) *types.ExecutorTask {
rct := rc.Tasks[rt.ID] rct := rc.Tasks[rt.ID]
environment := map[string]string{} environment := map[string]string{}
@ -382,7 +373,7 @@ func (s *Scheduler) genExecutorTask(ctx context.Context, r *types.Run, rt *types
// sendExecutorTask sends executor task to executor, if this fails the executor // sendExecutorTask sends executor task to executor, if this fails the executor
// will periodically fetch the executortask anyway // will periodically fetch the executortask anyway
func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) sendExecutorTask(ctx context.Context, et *types.ExecutorTask) error {
executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID) executor, err := store.GetExecutor(ctx, s.e, et.Status.ExecutorID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
@ -408,7 +399,7 @@ func (s *Scheduler) sendExecutorTask(ctx context.Context, et *types.ExecutorTask
return nil return nil
} }
func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) { func (s *Runservice) compactChangeGroupsLoop(ctx context.Context) {
for { for {
if err := s.compactChangeGroups(ctx); err != nil { if err := s.compactChangeGroups(ctx); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
@ -424,7 +415,7 @@ func (s *Scheduler) compactChangeGroupsLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) compactChangeGroups(ctx context.Context) error { func (s *Runservice) compactChangeGroups(ctx context.Context) error {
resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey) resp, err := s.e.Client().Get(ctx, common.EtcdChangeGroupMinRevisionKey)
if err != nil { if err != nil {
return err return err
@ -470,7 +461,7 @@ func (s *Scheduler) compactChangeGroups(ctx context.Context) error {
return nil return nil
} }
func (s *Scheduler) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error { func (s *Runservice) scheduleRun(ctx context.Context, r *types.Run, rc *types.RunConfig) error {
log.Debugf("r: %s", util.Dump(r)) log.Debugf("r: %s", util.Dump(r))
prevPhase := r.Phase prevPhase := r.Phase
@ -617,7 +608,7 @@ func advanceRun(ctx context.Context, r *types.Run, rc *types.RunConfig, activeEx
return nil return nil
} }
func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) handleExecutorTaskUpdate(ctx context.Context, et *types.ExecutorTask) error {
r, _, err := store.GetRun(ctx, s.e, et.RunID) r, _, err := store.GetRun(ctx, s.e, et.RunID)
if err != nil { if err != nil {
return err return err
@ -638,7 +629,7 @@ func (s *Scheduler) handleExecutorTaskUpdate(ctx context.Context, et *types.Exec
return s.scheduleRun(ctx, r, rc) return s.scheduleRun(ctx, r, rc)
} }
func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error { func (s *Runservice) updateRunTaskStatus(ctx context.Context, et *types.ExecutorTask, r *types.Run) error {
log.Debugf("et: %s", util.Dump(et)) log.Debugf("et: %s", util.Dump(et))
rt, ok := r.Tasks[et.ID] rt, ok := r.Tasks[et.ID]
@ -715,7 +706,7 @@ func (s *Scheduler) updateRunTaskStatus(ctx context.Context, et *types.ExecutorT
return nil return nil
} }
func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) { func (s *Runservice) executorTaskUpdateHandler(ctx context.Context, c <-chan *types.ExecutorTask) {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
@ -732,7 +723,7 @@ func (s *Scheduler) executorTaskUpdateHandler(ctx context.Context, c <-chan *typ
} }
} }
func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) { func (s *Runservice) executorTasksCleanerLoop(ctx context.Context) {
for { for {
log.Debugf("executorTasksCleaner") log.Debugf("executorTasksCleaner")
@ -750,7 +741,7 @@ func (s *Scheduler) executorTasksCleanerLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) executorTasksCleaner(ctx context.Context) error { func (s *Runservice) executorTasksCleaner(ctx context.Context) error {
resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0) resp, err := s.e.List(ctx, common.EtcdTasksDir, "", 0)
if err != nil { if err != nil {
return err return err
@ -771,7 +762,7 @@ func (s *Scheduler) executorTasksCleaner(ctx context.Context) error {
return nil return nil
} }
func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error { func (s *Runservice) executorTaskCleaner(ctx context.Context, et *types.ExecutorTask) error {
log.Debugf("et: %s", util.Dump(et)) log.Debugf("et: %s", util.Dump(et))
if et.Status.Phase.IsFinished() { if et.Status.Phase.IsFinished() {
r, _, err := store.GetRun(ctx, s.e, et.RunID) r, _, err := store.GetRun(ctx, s.e, et.RunID)
@ -820,7 +811,7 @@ func (s *Scheduler) executorTaskCleaner(ctx context.Context, et *types.ExecutorT
return nil return nil
} }
func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) { func (s *Runservice) runTasksUpdaterLoop(ctx context.Context) {
for { for {
log.Debugf("runTasksUpdater") log.Debugf("runTasksUpdater")
@ -832,7 +823,7 @@ func (s *Scheduler) runTasksUpdaterLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) runTasksUpdater(ctx context.Context) error { func (s *Runservice) runTasksUpdater(ctx context.Context) error {
log.Debugf("runTasksUpdater") log.Debugf("runTasksUpdater")
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
@ -870,7 +861,7 @@ func (s *Scheduler) runTasksUpdater(ctx context.Context) error {
return nil return nil
} }
func (s *Scheduler) fileExists(path string) (bool, error) { func (s *Runservice) fileExists(path string) (bool, error) {
_, err := os.Stat(path) _, err := os.Stat(path)
if err != nil && !os.IsNotExist(err) { if err != nil && !os.IsNotExist(err) {
return false, err return false, err
@ -878,7 +869,7 @@ func (s *Scheduler) fileExists(path string) (bool, error) {
return !os.IsNotExist(err), nil return !os.IsNotExist(err), nil
} }
func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error { func (s *Runservice) fetchLog(ctx context.Context, rt *types.RunTask, setup bool, stepnum int) error {
et, err := store.GetExecutorTask(ctx, s.e, rt.ID) et, err := store.GetExecutorTask(ctx, s.e, rt.ID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
@ -944,7 +935,7 @@ func (s *Scheduler) fetchLog(ctx context.Context, rt *types.RunTask, setup bool,
return s.ost.WriteObject(logPath, r.Body, size, false) return s.ost.WriteObject(logPath, r.Body, size, false)
} }
func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error { func (s *Runservice) finishSetupLogPhase(ctx context.Context, runID, runTaskID string) error {
r, _, err := store.GetRun(ctx, s.e, runID) r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil { if err != nil {
return err return err
@ -961,7 +952,7 @@ func (s *Scheduler) finishSetupLogPhase(ctx context.Context, runID, runTaskID st
return nil return nil
} }
func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error { func (s *Runservice) finishStepLogPhase(ctx context.Context, runID, runTaskID string, stepnum int) error {
r, _, err := store.GetRun(ctx, s.e, runID) r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil { if err != nil {
return err return err
@ -981,7 +972,7 @@ func (s *Scheduler) finishStepLogPhase(ctx context.Context, runID, runTaskID str
return nil return nil
} }
func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error { func (s *Runservice) finishArchivePhase(ctx context.Context, runID, runTaskID string, stepnum int) error {
r, _, err := store.GetRun(ctx, s.e, runID) r, _, err := store.GetRun(ctx, s.e, runID)
if err != nil { if err != nil {
return err return err
@ -1011,7 +1002,7 @@ func (s *Scheduler) finishArchivePhase(ctx context.Context, runID, runTaskID str
return nil return nil
} }
func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) { func (s *Runservice) fetchTaskLogs(ctx context.Context, runID string, rt *types.RunTask) {
log.Debugf("fetchTaskLogs") log.Debugf("fetchTaskLogs")
// fetch setup log // fetch setup log
@ -1040,7 +1031,7 @@ func (s *Scheduler) fetchTaskLogs(ctx context.Context, runID string, rt *types.R
} }
} }
func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error { func (s *Runservice) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum int) error {
et, err := store.GetExecutorTask(ctx, s.e, rt.ID) et, err := store.GetExecutorTask(ctx, s.e, rt.ID)
if err != nil && err != etcd.ErrKeyNotFound { if err != nil && err != etcd.ErrKeyNotFound {
return err return err
@ -1095,7 +1086,7 @@ func (s *Scheduler) fetchArchive(ctx context.Context, rt *types.RunTask, stepnum
return s.ost.WriteObject(path, r.Body, size, false) return s.ost.WriteObject(path, r.Body, size, false)
} }
func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) { func (s *Runservice) fetchTaskArchives(ctx context.Context, runID string, rt *types.RunTask) {
log.Debugf("fetchTaskArchives") log.Debugf("fetchTaskArchives")
for i, stepnum := range rt.WorkspaceArchives { for i, stepnum := range rt.WorkspaceArchives {
phase := rt.WorkspaceArchivesPhase[i] phase := rt.WorkspaceArchivesPhase[i]
@ -1112,7 +1103,7 @@ func (s *Scheduler) fetchTaskArchives(ctx context.Context, runID string, rt *typ
} }
} }
func (s *Scheduler) fetcherLoop(ctx context.Context) { func (s *Runservice) fetcherLoop(ctx context.Context) {
for { for {
log.Debugf("fetcher") log.Debugf("fetcher")
@ -1130,7 +1121,7 @@ func (s *Scheduler) fetcherLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) fetcher(ctx context.Context) error { func (s *Runservice) fetcher(ctx context.Context) error {
log.Debugf("fetcher") log.Debugf("fetcher")
runs, err := store.GetRuns(ctx, s.e) runs, err := store.GetRuns(ctx, s.e)
if err != nil { if err != nil {
@ -1155,7 +1146,7 @@ func (s *Scheduler) fetcher(ctx context.Context) error {
} }
func (s *Scheduler) runsSchedulerLoop(ctx context.Context) { func (s *Runservice) runsSchedulerLoop(ctx context.Context) {
for { for {
log.Debugf("runsSchedulerLoop") log.Debugf("runsSchedulerLoop")
@ -1173,7 +1164,7 @@ func (s *Scheduler) runsSchedulerLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) runsScheduler(ctx context.Context) error { func (s *Runservice) runsScheduler(ctx context.Context) error {
log.Debugf("runsScheduler") log.Debugf("runsScheduler")
runs, err := store.GetRuns(ctx, s.e) runs, err := store.GetRuns(ctx, s.e)
if err != nil { if err != nil {
@ -1188,7 +1179,7 @@ func (s *Scheduler) runsScheduler(ctx context.Context) error {
return nil return nil
} }
func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error { func (s *Runservice) runScheduler(ctx context.Context, r *types.Run) error {
log.Debugf("runScheduler") log.Debugf("runScheduler")
rc, err := store.OSTGetRunConfig(s.dm, r.ID) rc, err := store.OSTGetRunConfig(s.dm, r.ID)
if err != nil { if err != nil {
@ -1198,7 +1189,7 @@ func (s *Scheduler) runScheduler(ctx context.Context, r *types.Run) error {
return s.scheduleRun(ctx, r, rc) return s.scheduleRun(ctx, r, rc)
} }
func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) { func (s *Runservice) finishedRunsArchiverLoop(ctx context.Context) {
for { for {
log.Debugf("finished run archiver loop") log.Debugf("finished run archiver loop")
@ -1216,7 +1207,7 @@ func (s *Scheduler) finishedRunsArchiverLoop(ctx context.Context) {
} }
} }
func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error { func (s *Runservice) finishedRunsArchiver(ctx context.Context) error {
log.Debugf("finished run archiver") log.Debugf("finished run archiver")
runs, err := store.GetRuns(ctx, s.e) runs, err := store.GetRuns(ctx, s.e)
if err != nil { if err != nil {
@ -1246,7 +1237,7 @@ func (s *Scheduler) finishedRunsArchiver(ctx context.Context) error {
// finishedRunArchiver archives a run if it's finished and all the fetching // finishedRunArchiver archives a run if it's finished and all the fetching
// phases (logs and archives) are marked as finished // phases (logs and archives) are marked as finished
func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error { func (s *Runservice) finishedRunArchiver(ctx context.Context, r *types.Run) error {
//log.Debugf("r: %s", util.Dump(r)) //log.Debugf("r: %s", util.Dump(r))
if !r.Phase.IsFinished() { if !r.Phase.IsFinished() {
return nil return nil
@ -1289,7 +1280,7 @@ func (s *Scheduler) finishedRunArchiver(ctx context.Context, r *types.Run) error
return nil return nil
} }
func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error { func (s *Runservice) runOSTArchiver(ctx context.Context, r *types.Run) error {
// TODO(sgotti) avoid saving multiple times the run on objectstorage if the // TODO(sgotti) avoid saving multiple times the run on objectstorage if the
// store.DeletedArchivedRun fails // store.DeletedArchivedRun fails
log.Infof("saving run in objectstorage: %s", r.ID) log.Infof("saving run in objectstorage: %s", r.ID)
@ -1312,7 +1303,7 @@ func (s *Scheduler) runOSTArchiver(ctx context.Context, r *types.Run) error {
return nil return nil
} }
func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) { func (s *Runservice) cacheCleanerLoop(ctx context.Context, cacheExpireInterval time.Duration) {
for { for {
if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil { if err := s.cacheCleaner(ctx, cacheExpireInterval); err != nil {
log.Errorf("err: %+v", err) log.Errorf("err: %+v", err)
@ -1328,7 +1319,7 @@ func (s *Scheduler) cacheCleanerLoop(ctx context.Context, cacheExpireInterval ti
} }
} }
func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error { func (s *Runservice) cacheCleaner(ctx context.Context, cacheExpireInterval time.Duration) error {
log.Debugf("cacheCleaner") log.Debugf("cacheCleaner")
session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) session, err := concurrency.NewSession(s.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx))
@ -1364,233 +1355,3 @@ func (s *Scheduler) cacheCleaner(ctx context.Context, cacheExpireInterval time.D
return nil return nil
} }
// etcdPingerLoop periodically updates a key.
// This is used by watchers to inform the client of the current revision
// this is needed since if other users are updating other unwatched keys on
// etcd we won't be notified, not updating the known revisions
// TODO(sgotti) use upcoming etcd 3.4 watch RequestProgress???
func (s *Scheduler) etcdPingerLoop(ctx context.Context) {
for {
if err := s.etcdPinger(ctx); err != nil {
log.Errorf("err: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(1 * time.Second)
}
}
func (s *Scheduler) etcdPinger(ctx context.Context) error {
if _, err := s.e.Put(ctx, common.EtcdPingKey, []byte{}, nil); err != nil {
return err
}
return nil
}
type Scheduler struct {
c *config.RunServiceScheduler
e *etcd.Store
ost *objectstorage.ObjStorage
dm *datamanager.DataManager
readDB *readdb.ReadDB
ah *action.ActionHandler
}
func NewScheduler(ctx context.Context, c *config.RunServiceScheduler) (*Scheduler, error) {
if c.Debug {
level.SetLevel(zapcore.DebugLevel)
}
ost, err := scommon.NewObjectStorage(&c.ObjectStorage)
if err != nil {
return nil, err
}
e, err := scommon.NewEtcd(&c.Etcd, logger, "runscheduler")
if err != nil {
return nil, err
}
s := &Scheduler{
c: c,
e: e,
ost: ost,
}
dmConf := &datamanager.DataManagerConfig{
E: e,
OST: ost,
DataTypes: []string{
string(common.DataTypeRun),
string(common.DataTypeRunConfig),
string(common.DataTypeRunCounter),
},
}
dm, err := datamanager.NewDataManager(ctx, logger, dmConf)
if err != nil {
return nil, err
}
s.dm = dm
readDB, err := readdb.NewReadDB(ctx, logger, filepath.Join(c.DataDir, "readdb"), e, ost, dm)
if err != nil {
return nil, err
}
s.readDB = readDB
ah := action.NewActionHandler(logger, e, readDB, ost, dm)
s.ah = ah
return s, nil
}
func (s *Scheduler) InitEtcd(ctx context.Context) error {
// Create changegroup min revision if it doesn't exists
cmp := []etcdclientv3.Cmp{}
then := []etcdclientv3.Op{}
cmp = append(cmp, etcdclientv3.Compare(etcdclientv3.CreateRevision(common.EtcdChangeGroupMinRevisionKey), "=", 0))
then = append(then, etcdclientv3.OpPut(common.EtcdChangeGroupMinRevisionKey, ""))
txn := s.e.Client().Txn(ctx).If(cmp...).Then(then...)
if _, err := txn.Commit(); err != nil {
return etcd.FromEtcdError(err)
}
return nil
}
func (s *Scheduler) Run(ctx context.Context) error {
errCh := make(chan error)
dmReadyCh := make(chan struct{})
go func() { errCh <- s.dm.Run(ctx, dmReadyCh) }()
// wait for dm to be ready
<-dmReadyCh
for {
err := s.InitEtcd(ctx)
if err == nil {
break
}
log.Errorf("failed to initialize etcd: %+v", err)
time.Sleep(1 * time.Second)
}
go s.readDB.Run(ctx)
ch := make(chan *types.ExecutorTask)
// noop coors handler
corsHandler := func(h http.Handler) http.Handler {
return h
}
corsAllowedMethodsOptions := ghandlers.AllowedMethods([]string{"GET", "HEAD", "POST", "PUT", "DELETE"})
corsAllowedHeadersOptions := ghandlers.AllowedHeaders([]string{"Accept", "Accept-Encoding", "Authorization", "Content-Length", "Content-Type", "X-CSRF-Token", "Authorization"})
corsAllowedOriginsOptions := ghandlers.AllowedOrigins([]string{"*"})
corsHandler = ghandlers.CORS(corsAllowedMethodsOptions, corsAllowedHeadersOptions, corsAllowedOriginsOptions)
// executor dedicated api, only calls from executor should happen on these handlers
executorStatusHandler := api.NewExecutorStatusHandler(logger, s.e, s.ah)
executorTaskStatusHandler := api.NewExecutorTaskStatusHandler(s.e, ch)
executorTaskHandler := api.NewExecutorTaskHandler(s.e)
executorTasksHandler := api.NewExecutorTasksHandler(s.e)
archivesHandler := api.NewArchivesHandler(logger, s.ost)
cacheHandler := api.NewCacheHandler(logger, s.ost)
cacheCreateHandler := api.NewCacheCreateHandler(logger, s.ost)
// api from clients
executorDeleteHandler := api.NewExecutorDeleteHandler(logger, s.ah)
logsHandler := api.NewLogsHandler(logger, s.e, s.ost, s.dm)
runHandler := api.NewRunHandler(logger, s.e, s.dm, s.readDB)
runTaskActionsHandler := api.NewRunTaskActionsHandler(logger, s.ah)
runsHandler := api.NewRunsHandler(logger, s.readDB)
runActionsHandler := api.NewRunActionsHandler(logger, s.ah)
runCreateHandler := api.NewRunCreateHandler(logger, s.ah)
changeGroupsUpdateTokensHandler := api.NewChangeGroupsUpdateTokensHandler(logger, s.readDB)
router := mux.NewRouter()
apirouter := router.PathPrefix("/api/v1alpha").Subrouter()
// don't return 404 on a call to an undefined handler but 400 to distinguish between a non existent resource and a wrong method
apirouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
apirouter.Handle("/executor/{executorid}", executorStatusHandler).Methods("POST")
apirouter.Handle("/executor/{executorid}", executorDeleteHandler).Methods("DELETE")
apirouter.Handle("/executor/{executorid}/tasks", executorTasksHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskHandler).Methods("GET")
apirouter.Handle("/executor/{executorid}/tasks/{taskid}", executorTaskStatusHandler).Methods("POST")
apirouter.Handle("/executor/archives", archivesHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("HEAD")
apirouter.Handle("/executor/caches/{key}", cacheHandler).Methods("GET")
apirouter.Handle("/executor/caches/{key}", cacheCreateHandler).Methods("POST")
apirouter.Handle("/logs", logsHandler).Methods("GET")
apirouter.Handle("/runs/{runid}", runHandler).Methods("GET")
apirouter.Handle("/runs/{runid}/actions", runActionsHandler).Methods("PUT")
apirouter.Handle("/runs/{runid}/tasks/{taskid}/actions", runTaskActionsHandler).Methods("PUT")
apirouter.Handle("/runs", runsHandler).Methods("GET")
apirouter.Handle("/runs", runCreateHandler).Methods("POST")
apirouter.Handle("/changegroups", changeGroupsUpdateTokensHandler).Methods("GET")
mainrouter := mux.NewRouter()
mainrouter.PathPrefix("/").Handler(corsHandler(router))
// Return a bad request when it doesn't match any route
mainrouter.NotFoundHandler = http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusBadRequest) })
go s.executorTasksCleanerLoop(ctx)
go s.runsSchedulerLoop(ctx)
go s.runTasksUpdaterLoop(ctx)
go s.fetcherLoop(ctx)
go s.finishedRunsArchiverLoop(ctx)
go s.compactChangeGroupsLoop(ctx)
go s.cacheCleanerLoop(ctx, s.c.RunCacheExpireInterval)
go s.executorTaskUpdateHandler(ctx, ch)
go s.etcdPingerLoop(ctx)
var tlsConfig *tls.Config
if s.c.Web.TLS {
var err error
tlsConfig, err = util.NewTLSConfig(s.c.Web.TLSCertFile, s.c.Web.TLSKeyFile, "", false)
if err != nil {
log.Errorf("err: %+v")
return err
}
}
httpServer := http.Server{
Addr: s.c.Web.ListenAddress,
Handler: mainrouter,
TLSConfig: tlsConfig,
}
lerrCh := make(chan error)
go func() {
lerrCh <- httpServer.ListenAndServe()
}()
select {
case <-ctx.Done():
log.Infof("runservice scheduler exiting")
httpServer.Close()
return nil
case err := <-lerrCh:
log.Errorf("http server listen error: %v", err)
return err
case err := <-errCh:
log.Errorf("error: %+v", err)
return err
}
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
package scheduler package runservice
import ( import (
"context" "context"

View File

@ -25,7 +25,7 @@ import (
"github.com/sorintlab/agola/internal/datamanager" "github.com/sorintlab/agola/internal/datamanager"
"github.com/sorintlab/agola/internal/etcd" "github.com/sorintlab/agola/internal/etcd"
"github.com/sorintlab/agola/internal/objectstorage" "github.com/sorintlab/agola/internal/objectstorage"
"github.com/sorintlab/agola/internal/services/runservice/scheduler/common" "github.com/sorintlab/agola/internal/services/runservice/common"
"github.com/sorintlab/agola/internal/services/runservice/types" "github.com/sorintlab/agola/internal/services/runservice/types"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"

View File

@ -23,7 +23,7 @@ import (
slog "github.com/sorintlab/agola/internal/log" slog "github.com/sorintlab/agola/internal/log"
"github.com/sorintlab/agola/internal/services/config" "github.com/sorintlab/agola/internal/services/config"
"github.com/sorintlab/agola/internal/services/gateway/common" "github.com/sorintlab/agola/internal/services/gateway/common"
rsapi "github.com/sorintlab/agola/internal/services/runservice/scheduler/api" rsapi "github.com/sorintlab/agola/internal/services/runservice/api"
"github.com/sorintlab/agola/internal/util" "github.com/sorintlab/agola/internal/util"
"github.com/pkg/errors" "github.com/pkg/errors"
@ -194,7 +194,7 @@ func NewScheduler(c *config.Scheduler) (*Scheduler, error) {
} }
return &Scheduler{ return &Scheduler{
runserviceClient: rsapi.NewClient(c.RunServiceURL), runserviceClient: rsapi.NewClient(c.RunserviceURL),
}, nil }, nil
} }