runservice executor: update for executor groups

* Add the concept of executor groups and siblings executors
* Add the concept of dynamic executor: an executor in an executor group that
doesn't need to be manually deleted from the scheduler since the other sibling
executors will take care of cleaning up its pods.
* Remove external labels visibility from pod.
* Add functions to return the sibling executors and the executor group
* Delete pods of disappeared sibling executors
This commit is contained in:
Simone Gotti 2019-04-24 12:37:55 +02:00
parent 4da4f48f98
commit a0d69f4bc3
5 changed files with 137 additions and 66 deletions

View File

@ -43,9 +43,10 @@ type DockerDriver struct {
client *client.Client
initVolumeHostDir string
toolboxPath string
executorID string
}
func NewDockerDriver(logger *zap.Logger, initVolumeHostDir, toolboxPath string) (*DockerDriver, error) {
func NewDockerDriver(logger *zap.Logger, executorID, initVolumeHostDir, toolboxPath string) (*DockerDriver, error) {
cli, err := client.NewEnvClient()
if err != nil {
return nil, err
@ -55,6 +56,7 @@ func NewDockerDriver(logger *zap.Logger, initVolumeHostDir, toolboxPath string)
client: cli,
initVolumeHostDir: initVolumeHostDir,
toolboxPath: toolboxPath,
executorID: executorID,
}, nil
}
@ -149,12 +151,9 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.
io.Copy(out, reader)
labels := map[string]string{}
// prepend the podLabelPrefix to the labels' keys
for k, v := range podConfig.Labels {
labels[labelPrefix+k] = v
}
labels[agolaLabelKey] = agolaLabelValue
labels[podIDKey] = podConfig.ID
labels[taskIDKey] = podConfig.TaskID
containerLabels := map[string]string{}
for k, v := range labels {
@ -204,15 +203,21 @@ func (d *DockerDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.
id: podConfig.ID,
client: d.client,
containers: containers,
executorID: d.executorID,
}, nil
}
func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error) {
func (d *DockerDriver) ExecutorGroup(ctx context.Context) (string, error) {
// use the same group as the executor id
return d.executorID, nil
}
func (d *DockerDriver) GetExecutors(ctx context.Context) ([]string, error) {
return []string{d.executorID}, nil
}
func (d *DockerDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) {
args := filters.NewArgs()
// search label adding the podLabelPrefix
for k, v := range labels {
args.Add("label", fmt.Sprintf("%s%s=%s", labelPrefix, k, v))
}
containers, err := d.client.ContainerList(ctx,
types.ContainerListOptions{
@ -235,6 +240,7 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st
id: podID,
client: d.client,
containers: []types.Container{container},
executorID: d.executorID,
}
podsMap[podID] = pod
@ -268,9 +274,10 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st
// add labels from the container with index 0
if cIndex == 0 {
podLabels := map[string]string{}
// keep only labels starting with our prefix
for labelName, labelValue := range container.Labels {
if strings.HasPrefix(labelName, labelPrefix) {
podLabels[strings.TrimPrefix(labelName, labelPrefix)] = labelValue
podLabels[labelName] = labelValue
}
}
pod.labels = podLabels
@ -286,9 +293,10 @@ func (d *DockerDriver) GetPodsByLabels(ctx context.Context, labels map[string]st
func podLabelsFromContainer(containerLabels map[string]string) map[string]string {
labels := map[string]string{}
// keep only labels starting with our prefix
for k, v := range containerLabels {
if strings.HasPrefix(k, labelPrefix) {
labels[strings.TrimPrefix(k, labelPrefix)] = v
labels[k] = v
}
}
return labels
@ -321,14 +329,19 @@ type DockerPod struct {
client *client.Client
labels map[string]string
containers []types.Container
executorID string
}
func (dp *DockerPod) ID() string {
return dp.id
}
func (dp *DockerPod) Labels() map[string]string {
return dp.labels
func (dp *DockerPod) ExecutorID() string {
return dp.executorID
}
func (dp *DockerPod) TaskID() string {
return dp.labels[taskIDKey]
}
func (dp *DockerPod) Stop(ctx context.Context) error {

View File

@ -26,6 +26,7 @@ import (
"testing"
"unicode"
uuid "github.com/satori/go.uuid"
slog "github.com/sorintlab/agola/internal/log"
"github.com/google/go-cmp/cmp"
@ -72,7 +73,7 @@ func parseEnvs(r io.Reader) (map[string]string, error) {
return envs, nil
}
func TestPod(t *testing.T) {
func TestDockerPod(t *testing.T) {
if os.Getenv("SKIP_DOCKER_TESTS") == "1" {
t.Skip("skipping since env var SKIP_DOCKER_TESTS is 1")
}
@ -87,7 +88,7 @@ func TestPod(t *testing.T) {
}
defer os.RemoveAll(dir)
d, err := NewDockerDriver(logger, dir, toolboxPath)
d, err := NewDockerDriver(logger, "executorid01", dir, toolboxPath)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -96,6 +97,8 @@ func TestPod(t *testing.T) {
t.Run("create a pod with one container", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
@ -107,12 +110,13 @@ func TestPod(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
})
t.Run("execute a command inside a pod", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
@ -124,6 +128,7 @@ func TestPod(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
ce, err := pod.Exec(ctx, &ExecConfig{
Cmd: []string{"ls"},
@ -138,10 +143,8 @@ func TestPod(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}
if code != 0 {
t.Fatalf("unexpected exito code: %d", code)
t.Fatalf("unexpected exit code: %d", code)
}
defer pod.Remove(ctx)
})
t.Run("test pod environment", func(t *testing.T) {
@ -151,6 +154,8 @@ func TestPod(t *testing.T) {
}
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
@ -163,6 +168,7 @@ func TestPod(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
var buf bytes.Buffer
ce, err := pod.Exec(ctx, &ExecConfig{
@ -197,12 +203,12 @@ func TestPod(t *testing.T) {
}
}
}
defer pod.Remove(ctx)
})
t.Run("test get pods by label", func(t *testing.T) {
t.Run("test get pods", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
@ -214,8 +220,9 @@ func TestPod(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
pods, err := d.GetPodsByLabels(ctx, map[string]string{}, true)
pods, err := d.GetPods(ctx, true)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
@ -239,8 +246,6 @@ func TestPod(t *testing.T) {
if !ok {
t.Fatalf("pod with id %q not found", pod.ID())
}
defer pod.Remove(ctx)
})
}

View File

@ -44,15 +44,19 @@ const (
type Driver interface {
Setup(ctx context.Context) error
NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error)
GetPodsByLabels(ctx context.Context, labels map[string]string, all bool) ([]Pod, error)
GetPods(ctx context.Context, all bool) ([]Pod, error)
GetPodByID(ctx context.Context, containerID string) (Pod, error)
ExecutorGroup(ctx context.Context) (string, error)
GetExecutors(ctx context.Context) ([]string, error)
}
type Pod interface {
// ID returns the pod id
ID() string
// Labels returns the pod labels
Labels() map[string]string
// ExecutorID return the pod owner executor id
ExecutorID() string
// TaskID return the pod task id
TaskID() string
// Stop stops the pod
Stop(ctx context.Context) error
// Stop stops the pod
@ -68,8 +72,8 @@ type ContainerExec interface {
type PodConfig struct {
ID string
TaskID string
Containers []*ContainerConfig
Labels map[string]string
// The container dir where the init volume will be mounted
InitVolumeDir string
DockerConfig *registry.DockerConfig

View File

@ -56,8 +56,6 @@ var log = logger.Sugar()
const (
defaultShell = "/bin/sh -e"
taskIDLabel = "taskid"
toolboxContainerDir = "/mnt/agola"
)
@ -66,7 +64,7 @@ var (
)
func (e *Executor) getAllPods(ctx context.Context, all bool) ([]driver.Pod, error) {
return e.driver.GetPodsByLabels(ctx, createAllLabels(), all)
return e.driver.GetPods(ctx, all)
}
func (e *Executor) createFile(ctx context.Context, pod driver.Pod, command, user string, outf io.Writer) (string, error) {
@ -160,7 +158,7 @@ func (e *Executor) doRunStep(ctx context.Context, s *types.RunStep, t *types.Exe
workingDir, err = e.expandDir(ctx, t, pod, outf, workingDir)
if err != nil {
outf.WriteString(fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", workingDir, err))
outf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", workingDir, err))
return -1, err
}
@ -210,7 +208,7 @@ func (e *Executor) doSaveToWorkspaceStep(ctx context.Context, s *types.SaveToWor
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
if err != nil {
logf.WriteString(fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
logf.WriteString(fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
return -1, err
}
@ -332,7 +330,7 @@ func (e *Executor) template(ctx context.Context, t *types.ExecutorTask, pod driv
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
if err != nil {
io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
return "", err
}
@ -378,7 +376,7 @@ func (e *Executor) unarchive(ctx context.Context, t *types.ExecutorTask, source
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
if err != nil {
io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
return err
}
@ -501,7 +499,7 @@ func (e *Executor) doSaveCacheStep(ctx context.Context, s *types.SaveCacheStep,
workingDir, err := e.expandDir(ctx, t, pod, logf, t.WorkingDir)
if err != nil {
io.WriteString(logf, fmt.Sprintf("Failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
io.WriteString(logf, fmt.Sprintf("failed to expand working dir %q. Error: %s\n", t.WorkingDir, err))
return -1, err
}
@ -663,16 +661,45 @@ func (e *Executor) sendExecutorStatus(ctx context.Context) error {
arch := runtime.GOARCH
labels["arch"] = arch
executorGroup, err := e.driver.ExecutorGroup(ctx)
if err != nil {
return err
}
// report all the executors that are active OR that have some owned pods not yet removed
activeExecutors, err := e.driver.GetExecutors(ctx)
if err != nil {
return err
}
pods, err := e.driver.GetPods(ctx, true)
if err != nil {
return err
}
executorsMap := map[string]struct{}{}
for _, executorID := range activeExecutors {
executorsMap[executorID] = struct{}{}
}
for _, pod := range pods {
executorsMap[pod.ExecutorID()] = struct{}{}
}
siblingsExecutors := []string{}
for executorID := range executorsMap {
siblingsExecutors = append(siblingsExecutors, executorID)
}
executor := &types.Executor{
ID: e.id,
ListenURL: e.listenURL,
Labels: labels,
ActiveTasksLimit: e.c.ActiveTasksLimit,
ActiveTasks: activeTasks,
ID: e.id,
ListenURL: e.listenURL,
Labels: labels,
ActiveTasksLimit: e.c.ActiveTasksLimit,
ActiveTasks: activeTasks,
Dynamic: e.dynamic,
ExecutorGroup: executorGroup,
SiblingsExecutors: siblingsExecutors,
}
log.Debugf("send executor status: %s", util.Dump(executor))
_, err := e.runserviceClient.SendExecutorStatus(ctx, executor)
_, err = e.runserviceClient.SendExecutorStatus(ctx, executor)
return err
}
@ -806,7 +833,7 @@ func (e *Executor) setupTask(ctx context.Context, rt *runningTask) error {
// generate a random pod id (don't use task id for future ability to restart
// tasks failed to start and don't clash with existing pods)
ID: uuid.NewV4().String(),
Labels: createTaskLabels(et.ID),
TaskID: et.ID,
InitVolumeDir: toolboxContainerDir,
DockerConfig: dockerConfig,
Containers: []*driver.ContainerConfig{
@ -938,16 +965,6 @@ func (e *Executor) executeTaskInternal(ctx context.Context, et *types.ExecutorTa
return 0, nil
}
func createAllLabels() map[string]string {
return map[string]string{}
}
func createTaskLabels(taskID string) map[string]string {
return map[string]string{
taskIDLabel: taskID,
}
}
func (e *Executor) podsCleanerLoop(ctx context.Context) {
for {
log.Debugf("podsCleaner")
@ -971,13 +988,33 @@ func (e *Executor) podsCleaner(ctx context.Context) error {
if err != nil {
return err
}
executors, err := e.driver.GetExecutors(ctx)
if err != nil {
return err
}
// always add ourself to executors
executors = append(executors, e.id)
for _, pod := range pods {
taskID, ok := pod.Labels()[taskIDLabel]
if !ok {
continue
taskID := pod.TaskID()
// clean our owned pods
if pod.ExecutorID() == e.id {
if _, ok := e.runningTasks.get(taskID); !ok {
log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID)
pod.Remove(ctx)
}
}
if _, ok := e.runningTasks.get(taskID); !ok {
log.Infof("removing pod %s for not running task: %s", pod.ID(), taskID)
// if no executor owns the pod we'll delete it
owned := false
for _, executorID := range executors {
if pod.ExecutorID() == executorID {
owned = true
break
}
}
if !owned {
log.Infof("removing pod %s since it's not owned by any active executor", pod.ID())
pod.Remove(ctx)
}
}
@ -1196,6 +1233,7 @@ type Executor struct {
runningTasks *runningTasks
driver driver.Driver
listenURL string
dynamic bool
}
func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) {
@ -1216,15 +1254,9 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) {
c.ToolboxPath = path
}
dockerDriver, err := driver.NewDockerDriver(logger, "/tmp/agola/bin", c.ToolboxPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to create docker client")
}
e := &Executor{
c: c,
runserviceClient: rsapi.NewClient(c.RunServiceURL),
driver: dockerDriver,
runningTasks: &runningTasks{
tasks: make(map[string]*runningTask),
},
@ -1265,6 +1297,12 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) {
u.Host = net.JoinHostPort(addr, port)
e.listenURL = u.String()
d, err := driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to create docker driver")
}
e.driver = d
return e, nil
}

View File

@ -669,6 +669,17 @@ type Executor struct {
ActiveTasksLimit int `json:"active_tasks_limit,omitempty"`
ActiveTasks int `json:"active_tasks,omitempty"`
// Dynamic represents an executor that can be automatically removed since it's
// part of a group of executors managing the same resources (i.e. a k8s
// namespace managed by multiple executors that will automatically clean pods
// owned of an old executor)
Dynamic bool `json:"dynamic,omitempty"`
// ExecutorGroup is the executor group which this executor belongs
ExecutorGroup string `json:"executor_group,omitempty"`
// SiblingExecutors are all the executors in the ExecutorGroup
SiblingsExecutors []string `json:"siblings_executors,omitempty"`
// internal values not saved
Revision int64 `json:"-"`
}