runservice: add k8s driver

This commit is contained in:
Simone Gotti 2019-04-22 17:54:24 +02:00
parent 90d129750d
commit e0d37b08f2
9 changed files with 1152 additions and 7 deletions

View File

@ -34,7 +34,7 @@ local task_build_go(version, arch) = {
{ type: 'clone' },
{ type: 'restore_cache', keys: ['cache-sum-{{ md5sum "go.sum" }}', 'cache-date-'], dest_dir: '/go/pkg/mod/cache' },
{ type: 'run', command: 'make' },
{ type: 'run', command: 'SKIP_DOCKER_TESTS=1 go test -v -count 1 ./...' },
{ type: 'run', name: 'run tests', command: 'SKIP_DOCKER_TESTS=1 SKIP_K8S_TESTS=1 go test -v -count 1 ./...' },
{ type: 'save_cache', key: 'cache-sum-{{ md5sum "go.sum" }}', contents: [{ source_dir: '/go/pkg/mod/cache' }] },
{ type: 'save_cache', key: 'cache-date-{{ year }}-{{ month }}-{{ day }}', contents: [{ source_dir: '/go/pkg/mod/cache' }] },
],

View File

@ -47,6 +47,9 @@ runServiceExecutor:
runServiceURL: "http://localhost:4000"
web:
listenAddress: ":4001"
activeTasksLimit: 2
driver:
type: docker
gitServer:
dataDir: /tmp/agola/gitserver

18
go.mod
View File

@ -13,7 +13,10 @@ require (
github.com/docker/docker v1.13.1
github.com/docker/go-connections v0.4.0 // indirect
github.com/docker/go-units v0.3.3 // indirect
github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f // indirect
github.com/ghodss/yaml v1.0.0
github.com/go-bindata/go-bindata v1.0.0
github.com/go-ini/ini v1.42.0 // indirect
@ -21,10 +24,14 @@ require (
github.com/google/go-cmp v0.3.0
github.com/google/go-containerregistry v0.0.0-20190412005658-1d38b9cfdb9d
github.com/google/go-jsonnet v0.12.1
github.com/google/gofuzz v1.0.0 // indirect
github.com/googleapis/gnostic v0.2.0 // indirect
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect
github.com/gorilla/handlers v1.4.0
github.com/gorilla/mux v1.7.0
github.com/hashicorp/go-sockaddr v1.0.1
github.com/imdario/mergo v0.3.7 // indirect
github.com/json-iterator/go v1.1.6 // indirect
github.com/jtolds/gls v4.2.1+incompatible // indirect
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect
github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect
@ -34,6 +41,8 @@ require (
github.com/minio/minio-go v6.0.14+incompatible
github.com/mitchellh/copystructure v1.0.0
github.com/mitchellh/go-homedir v1.1.0
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v0.1.1 // indirect
@ -50,9 +59,16 @@ require (
go.uber.org/zap v1.9.1
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9
golang.org/x/oauth2 v0.0.0-20190220154721-9b3c75971fc9
gopkg.in/ini.v1 v1.41.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.42.0 // indirect
gopkg.in/yaml.v2 v2.2.2
gotest.tools v2.2.0+incompatible // indirect
k8s.io/api v0.0.0-20190313235455-40a48860b5ab
k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1
k8s.io/client-go v11.0.0+incompatible
k8s.io/klog v0.3.0 // indirect
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7
sigs.k8s.io/yaml v1.1.0 // indirect
)
replace github.com/docker/docker v1.13.1 => github.com/docker/engine v0.0.0-20181106193140-f5749085e9cb

37
go.sum
View File

@ -35,10 +35,16 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh
github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec=
github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk=
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c h1:ZfSZ3P3BedhKGUhzj7BQlPSU4OvT6tfOKe3DVHzOA7s=
github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs=
github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk=
github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4=
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f h1:8GDPb0tCY8LQ+OJ3dbHb5sA6YZWXFORQYZx5sdsTlMs=
github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc=
github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f h1:AUj1VoZUfhPhOPHULCQQDnGhRelpFWHMLhQVWDsS0v4=
github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
@ -70,8 +76,12 @@ github.com/google/go-jsonnet v0.12.1 h1:v0iUm/b4SBz7lR/diMoz9tLAz8lqtnNRKIwMrmU2
github.com/google/go-jsonnet v0.12.1/go.mod h1:gVu3UVSfOt5fRFq+dh9duBqXa5905QY8S1QvMNcEIVs=
github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g=
github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY=
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1ks85zJ1lfDGgIiMDuIptTOhJq+zKyg=
github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA=
@ -92,10 +102,14 @@ github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHg
github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI=
github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo=
github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE=
github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
@ -128,6 +142,10 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo=
github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
@ -152,6 +170,7 @@ github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L
github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A=
github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/sanity-io/litter v1.1.0 h1:BllcKWa3VbZmOZbDCoszYLk7zCsKHz5Beossi8SUcTc=
github.com/sanity-io/litter v1.1.0/go.mod h1:CJ0VCw2q4qKU7LaQr3n7UOSHzgEMgcGco7N/SkZQPjw=
@ -232,8 +251,10 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0=
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo=
gopkg.in/ini.v1 v1.41.0 h1:Ka3ViY6gNYSKiVy71zXBEqKplnV35ImDLVG+8uoIklE=
gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk=
gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
@ -241,3 +262,15 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo=
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
k8s.io/api v0.0.0-20190313235455-40a48860b5ab h1:DG9A67baNpoeweOy2spF1OWHhnVY5KR7/Ek/+U1lVZc=
k8s.io/api v0.0.0-20190313235455-40a48860b5ab/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA=
k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1 h1:IS7K02iBkQXpCeieSiyJjGoLSdVOv2DbPaWHJ+ZtgKg=
k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0=
k8s.io/client-go v11.0.0+incompatible h1:LBbX2+lOwY9flffWlJM7f1Ct8V2SRNiMRDFeiwnJo9o=
k8s.io/client-go v11.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s=
k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE=
k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c=
k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0=
sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs=
sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o=

View File

@ -81,6 +81,8 @@ type RunServiceExecutor struct {
Web Web `yaml:"web"`
Driver Driver `yaml:"driver"`
Labels map[string]string `yaml:"labels"`
// ActiveTasksLimit is the max number of concurrent active tasks
ActiveTasksLimit int `yaml:"active_tasks_limit"`
@ -160,6 +162,22 @@ type Etcd struct {
TLSSkipVerify bool `yaml:"tlsSkipVerify"`
}
type DriverType string
const (
DriverTypeDocker DriverType = "docker"
DriverTypeK8s DriverType = "kubernetes"
)
type Driver struct {
Type DriverType `yaml:"type"`
// docker fields
// k8s fields
}
type TokenSigning struct {
// token duration (defaults to 12 hours)
Duration time.Duration `yaml:"duration"`
@ -262,6 +280,15 @@ func Validate(c *Config) error {
if c.RunServiceExecutor.RunServiceURL == "" {
return errors.Errorf("runservice executor runServiceURL is empty")
}
if c.RunServiceExecutor.Driver.Type == "" {
return errors.Errorf("runservice executor driver type is empty")
}
switch c.RunServiceExecutor.Driver.Type {
case DriverTypeDocker:
case DriverTypeK8s:
default:
return errors.Errorf("runservice executor driver type %q unknown", c.RunServiceExecutor.Driver.Type)
}
// Scheduler
if c.Scheduler.RunServiceURL == "" {

View File

@ -0,0 +1,629 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
"context"
"encoding/json"
"fmt"
"io"
"path/filepath"
"strings"
"time"
"github.com/sorintlab/agola/internal/util"
"github.com/docker/docker/client"
"github.com/docker/docker/pkg/archive"
"github.com/pkg/errors"
uuid "github.com/satori/go.uuid"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apilabels "k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/tools/remotecommand"
utilexec "k8s.io/utils/exec"
)
const (
mainContainerName = "maincontainer"
configMapName = "agola-executors-group"
executorLeasePrefix = "agola-executor-"
podNamePrefix = "agola-task-"
executorsGroupIDKey = labelPrefix + "executorsgroupid"
executorsGroupIDConfigMapKey = "executorsgroupid"
useLeaseAPIKey = labelPrefix + "useleaseapi"
cmLeaseKey = labelPrefix + "lease"
renewExecutorLeaseInterval = 10 * time.Second
staleExecutorLeaseInterval = 1 * time.Minute
)
type K8sDriver struct {
log *zap.SugaredLogger
restconfig *restclient.Config
client *kubernetes.Clientset
toolboxPath string
namespace string
executorID string
executorsGroupID string
useLeaseAPI bool
}
type K8sPod struct {
id string
namespace string
labels map[string]string
restconfig *restclient.Config
client *kubernetes.Clientset
initVolumeDir string
}
func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDriver, error) {
kubeClientConfig := NewKubeClientConfig("", "", "")
kubecfg, err := kubeClientConfig.ClientConfig()
if err != nil {
return nil, err
}
kubecli, err := kubernetes.NewForConfig(kubecfg)
if err != nil {
return nil, fmt.Errorf("cannot create kubernetes client: %v", err)
}
namespace, _, err := kubeClientConfig.Namespace()
if err != nil {
return nil, err
}
d := &K8sDriver{
log: logger.Sugar(),
restconfig: kubecfg,
client: kubecli,
toolboxPath: toolboxPath,
namespace: namespace,
executorID: executorID,
}
lists, err := d.client.Discovery().ServerPreferredResources()
if err != nil {
return nil, err
}
hasLeaseAPI := false
for _, list := range lists {
if len(list.APIResources) == 0 {
continue
}
if list.GroupVersion != "coordination.k8s.io/v1" {
continue
}
for _, apiResource := range list.APIResources {
if apiResource.Kind == "Lease" {
hasLeaseAPI = true
}
}
}
d.useLeaseAPI = hasLeaseAPI
executorsGroupID, err := d.getOrCreateExecutorsGroupID(context.TODO())
if err != nil {
return nil, err
}
d.executorsGroupID = executorsGroupID
ctx := context.TODO()
go func() {
for {
if err := d.updateLease(ctx); err != nil {
d.log.Errorf("failed to update executor lease: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(renewExecutorLeaseInterval)
}
}()
go func() {
for {
if err := d.cleanStaleExecutorsLease(ctx); err != nil {
d.log.Errorf("failed to clean stale executors lease: %+v", err)
}
select {
case <-ctx.Done():
return
default:
}
time.Sleep(renewExecutorLeaseInterval)
}
}()
return d, nil
}
// NewKubeClientConfig return a kube client config that will by default use an
// in cluster client config or, if not available or overriden an external client
// config using the default client behavior used also by kubectl.
func NewKubeClientConfig(kubeconfigPath, context, namespace string) clientcmd.ClientConfig {
rules := clientcmd.NewDefaultClientConfigLoadingRules()
rules.DefaultClientConfig = &clientcmd.DefaultClientConfig
if kubeconfigPath != "" {
rules.ExplicitPath = kubeconfigPath
}
overrides := &clientcmd.ConfigOverrides{ClusterDefaults: clientcmd.ClusterDefaults}
if context != "" {
overrides.CurrentContext = context
}
if namespace != "" {
overrides.Context.Namespace = namespace
}
return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
}
func (d *K8sDriver) Setup(ctx context.Context) error {
return nil
}
func (d *K8sDriver) ExecutorGroup(ctx context.Context) (string, error) {
return d.executorsGroupID, nil
}
func (d *K8sDriver) GetExecutors(ctx context.Context) ([]string, error) {
return d.getLeases((ctx))
}
// executorsGroups gets or creates (if it doesn't exists) a configmap under
// the k8s namespace where the executorsgroup id is saved. The executorsgroupid
// is unique per k8s namespace and is shared by all the executors accessing this
// namespace
func (d *K8sDriver) getOrCreateExecutorsGroupID(ctx context.Context) (string, error) {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
// pod and secret name, based on pod id
cm, err := cmClient.Get(configMapName, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return "", err
}
} else {
return cm.Data[executorsGroupIDConfigMapKey], nil
}
executorsGroupID := uuid.NewV4().String()
cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: configMapName,
},
Data: map[string]string{executorsGroupIDConfigMapKey: executorsGroupID},
}
cm, err = cmClient.Create(cm)
if err != nil {
return "", err
}
return executorsGroupID, nil
}
func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) {
if len(podConfig.Containers) == 0 {
return nil, errors.Errorf("empty container config")
}
containerConfig := podConfig.Containers[0]
secretClient := d.client.CoreV1().Secrets(d.namespace)
podClient := d.client.CoreV1().Pods(d.namespace)
labels := map[string]string{}
labels[agolaLabelKey] = agolaLabelValue
labels[podIDKey] = podConfig.ID
labels[taskIDKey] = podConfig.TaskID
labels[executorIDKey] = d.executorID
labels[executorsGroupIDKey] = d.executorsGroupID
dockerconfigj, err := json.Marshal(podConfig.DockerConfig)
if err != nil {
return nil, err
}
// pod and secret name, based on pod id
name := podNamePrefix + podConfig.ID
// secret that hold the docker registry auth
secret := &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
Data: map[string][]byte{
".dockerconfigjson": dockerconfigj,
},
Type: corev1.SecretTypeDockerConfigJson,
}
secret, err = secretClient.Create(secret)
if err != nil {
return nil, err
}
pod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Namespace: d.namespace,
Name: name,
Labels: labels,
},
Spec: corev1.PodSpec{
ImagePullSecrets: []corev1.LocalObjectReference{{Name: name}},
// don't mount service account secrets or pods will be able to talk with k8s
// api
AutomountServiceAccountToken: util.BoolP(false),
InitContainers: []corev1.Container{
{
Name: "initcontainer",
Image: "busybox",
// wait for a file named /tmp/done and then exit
Command: []string{"/bin/sh", "-c", "while true; do if [[ -f /tmp/done ]]; then exit; fi; sleep 1; done"},
Stdin: true,
VolumeMounts: []corev1.VolumeMount{
{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
},
},
},
},
Containers: []corev1.Container{
{
Name: mainContainerName,
Image: containerConfig.Image,
Command: containerConfig.Cmd[0:1],
Args: containerConfig.Cmd[1:],
Env: genEnvVars(containerConfig.Env),
Stdin: true,
WorkingDir: containerConfig.WorkingDir,
VolumeMounts: []corev1.VolumeMount{
{
Name: "agolavolume",
MountPath: podConfig.InitVolumeDir,
ReadOnly: true,
},
},
// by default always try to pull the image so we are sure only authorized users can fetch them
// see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages
ImagePullPolicy: corev1.PullAlways,
SecurityContext: &corev1.SecurityContext{
Privileged: &containerConfig.Privileged,
},
},
},
Volumes: []corev1.Volume{
{
Name: "agolavolume",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
},
},
},
},
}
pod, err = podClient.Create(pod)
if err != nil {
return nil, err
}
watcher, err := podClient.Watch(
metav1.SingleObject(pod.ObjectMeta),
)
if err != nil {
return nil, err
}
// wait for init container to be ready
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Modified:
pod := event.Object.(*corev1.Pod)
if len(pod.Status.InitContainerStatuses) > 0 {
if pod.Status.InitContainerStatuses[0].State.Running != nil {
watcher.Stop()
}
}
case watch.Deleted:
return nil, errors.Errorf("pod %q has been deleted", pod.Name)
}
}
fmt.Fprintf(out, "init container ready\n")
srcInfo, err := archive.CopyInfoSourcePath(d.toolboxPath, false)
if err != nil {
return nil, err
}
srcArchive, err := archive.TarResource(srcInfo)
if err != nil {
return nil, err
}
defer srcArchive.Close()
coreclient, err := corev1client.NewForConfig(d.restconfig)
if err != nil {
return nil, err
}
req := coreclient.RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: "initcontainer",
Command: []string{"tar", "xf", "-", "-C", podConfig.InitVolumeDir},
Stdin: true,
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL())
if err != nil {
return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL())
}
err = exec.Stream(remotecommand.StreamOptions{
Stdin: srcArchive,
Stdout: out,
Stderr: out,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to execute command on initcontainer")
}
req = coreclient.RESTClient().
Post().
Namespace(pod.Namespace).
Resource("pods").
Name(pod.Name).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: "initcontainer",
Command: []string{"touch", "/tmp/done"},
Stdout: true,
Stderr: true,
TTY: false,
}, scheme.ParameterCodec)
exec, err = remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL())
if err != nil {
return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL())
}
err = exec.Stream(remotecommand.StreamOptions{
Stdout: out,
Stderr: out,
})
if err != nil {
return nil, errors.Wrapf(err, "failed to execute command on initcontainer")
}
watcher, err = podClient.Watch(
metav1.SingleObject(pod.ObjectMeta),
)
if err != nil {
return nil, err
}
// wait for pod to be initialized
for event := range watcher.ResultChan() {
switch event.Type {
case watch.Modified:
pod := event.Object.(*corev1.Pod)
if len(pod.Status.ContainerStatuses) > 0 {
if pod.Status.ContainerStatuses[0].State.Running != nil {
watcher.Stop()
}
}
case watch.Deleted:
return nil, errors.Errorf("pod %q has been deleted", pod.Name)
}
}
return &K8sPod{
id: pod.Name,
namespace: pod.Namespace,
restconfig: d.restconfig,
client: d.client,
initVolumeDir: podConfig.InitVolumeDir,
}, nil
}
func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) {
podClient := d.client.CoreV1().Pods(d.namespace)
// get all pods for the executor group, also the ones managed by other executors in the same executor group
labels := map[string]string{executorsGroupIDKey: d.executorsGroupID}
// TODO(sgotti) use go client listers instead of querying every time
k8sPods, err := podClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
if err != nil {
return nil, err
}
pods := make([]Pod, len(k8sPods.Items))
for i, k8sPod := range k8sPods.Items {
labels := map[string]string{}
// keep only labels starting with our prefix
for n, v := range k8sPod.Labels {
if strings.HasPrefix(n, labelPrefix) {
labels[n] = v
}
}
pods[i] = &K8sPod{
id: k8sPod.Name,
namespace: k8sPod.Namespace,
labels: labels,
restconfig: d.restconfig,
client: d.client,
}
}
return pods, nil
}
func (p *K8sPod) ID() string {
return p.id
}
func (p *K8sPod) ExecutorID() string {
return p.labels[executorIDKey]
}
func (p *K8sPod) TaskID() string {
return p.labels[taskIDKey]
}
func (p *K8sPod) Stop(ctx context.Context) error {
d := int64(0)
secretClient := p.client.CoreV1().Secrets(p.namespace)
if err := secretClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil {
return err
}
podClient := p.client.CoreV1().Pods(p.namespace)
if err := podClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil {
return err
}
return nil
}
func (p *K8sPod) Remove(ctx context.Context) error {
return p.Stop(ctx)
}
type K8sContainerExec struct {
execID string
client *client.Client
endCh chan error
stdin io.WriteCloser
}
func (p *K8sPod) Exec(ctx context.Context, execConfig *ExecConfig) (ContainerExec, error) {
endCh := make(chan error)
coreclient, err := corev1client.NewForConfig(p.restconfig)
if err != nil {
return nil, err
}
// k8s pod exec api doesn't let us define the workingdir and the environment.
// Use a toolbox command that will set them up and then exec the real command.
envj, err := json.Marshal(execConfig.Env)
if err != nil {
return nil, err
}
cmd := []string{filepath.Join(p.initVolumeDir, "agola-toolbox"), "exec", "-e", string(envj), "-w", execConfig.WorkingDir, "--"}
cmd = append(cmd, execConfig.Cmd...)
req := coreclient.RESTClient().
Post().
Namespace(p.namespace).
Resource("pods").
Name(p.id).
SubResource("exec").
VersionedParams(&corev1.PodExecOptions{
Container: mainContainerName,
Command: cmd,
Stdin: true,
Stdout: execConfig.Stdout != nil,
Stderr: execConfig.Stderr != nil,
TTY: execConfig.Tty,
}, scheme.ParameterCodec)
exec, err := remotecommand.NewSPDYExecutor(p.restconfig, "POST", req.URL())
if err != nil {
return nil, err
}
reader, writer := io.Pipe()
go func() {
err := exec.Stream(remotecommand.StreamOptions{
Stdin: reader,
Stdout: execConfig.Stdout,
Stderr: execConfig.Stderr,
Tty: execConfig.Tty,
})
endCh <- err
}()
return &K8sContainerExec{
stdin: writer,
endCh: endCh,
}, nil
}
func (e *K8sContainerExec) Wait(ctx context.Context) (int, error) {
err := <-e.endCh
var exitCode int
if err != nil {
switch err := err.(type) {
case utilexec.ExitError:
exitCode = err.ExitStatus()
default:
return -1, err
}
}
return exitCode, nil
}
func (e *K8sContainerExec) Stdin() io.WriteCloser {
return e.stdin
}
func genEnvVars(env map[string]string) []corev1.EnvVar {
envVars := make([]corev1.EnvVar, 0, len(env))
for n, v := range env {
envVars = append(envVars, corev1.EnvVar{Name: n, Value: v})
}
return envVars
}

View File

@ -0,0 +1,192 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
"bytes"
"context"
"io/ioutil"
"os"
"testing"
uuid "github.com/satori/go.uuid"
)
func TestK8sPod(t *testing.T) {
if os.Getenv("SKIP_K8S_TESTS") == "1" {
t.Skip("skipping since env var SKIP_K8S_TESTS is 1")
}
toolboxPath := os.Getenv("AGOLA_TOOLBOX_PATH")
if toolboxPath == "" {
t.Fatalf("env var AGOLA_TOOLBOX_PATH is undefined")
}
dir, err := ioutil.TempDir("", "agola")
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer os.RemoveAll(dir)
d, err := NewK8sDriver(logger, "executorid01", toolboxPath)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ctx := context.Background()
t.Run("create a pod with one container", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
})
t.Run("execute a command inside a pod", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
ce, err := pod.Exec(ctx, &ExecConfig{
Cmd: []string{"ls"},
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ce.Stdin().Close()
code, err := ce.Wait(ctx)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if code != 0 {
t.Fatalf("unexpected exit code: %d", code)
}
})
t.Run("test pod environment", func(t *testing.T) {
env := map[string]string{
"ENV01": "ENVVALUE01",
"ENV02": "ENVVALUE02",
}
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
Env: env,
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
var buf bytes.Buffer
ce, err := pod.Exec(ctx, &ExecConfig{
Cmd: []string{"env"},
Stdout: &buf,
Stderr: os.Stdout,
})
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ce.Stdin().Close()
code, err := ce.Wait(ctx)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if code != 0 {
t.Fatalf("unexpected exit code: %d", code)
}
curEnv, err := parseEnvs(bytes.NewReader(buf.Bytes()))
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
for n, e := range env {
if ce, ok := curEnv[n]; !ok {
t.Fatalf("missing env var %s", n)
} else {
if ce != e {
t.Fatalf("different env var %s value, want: %q, got %q", n, e, ce)
}
}
}
})
t.Run("test get pods", func(t *testing.T) {
pod, err := d.NewPod(ctx, &PodConfig{
ID: uuid.NewV4().String(),
TaskID: uuid.NewV4().String(),
Containers: []*ContainerConfig{
&ContainerConfig{
Cmd: []string{"cat"},
Image: "busybox",
},
},
InitVolumeDir: "/tmp/agola",
}, ioutil.Discard)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
defer pod.Remove(ctx)
pods, err := d.GetPods(ctx, true)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
ok := false
for _, p := range pods {
if p.ID() == pod.ID() {
ok = true
}
}
if !ok {
t.Fatalf("pod with id %q not found", pod.ID())
}
})
}

View File

@ -0,0 +1,233 @@
// Copyright 2019 Sorint.lab
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
// See the License for the specific language governing permissions and
// limitations under the License.
package driver
import (
"context"
"encoding/json"
"time"
"github.com/pkg/errors"
coordinationv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apilabels "k8s.io/apimachinery/pkg/labels"
)
type LeaseData struct {
LeaseDurationSeconds int `json:"leaseDurationSeconds"`
AcquireTime time.Time `json:"acquireTime"`
RenewTime time.Time `json:"renewTime"`
HolderIdentity string `json:"holderIdentity"`
}
func (d *K8sDriver) updateLease(ctx context.Context) error {
duration := int(staleExecutorLeaseInterval / time.Second)
now := time.Now()
name := executorLeasePrefix + d.executorID
labels := map[string]string{}
labels[executorsGroupIDKey] = d.executorsGroupID
labels[executorIDKey] = d.executorID
if d.useLeaseAPI {
duration := int32(duration)
now := metav1.MicroTime{now}
leaseClient := d.client.CoordinationV1().Leases(d.namespace)
found := false
lease, err := leaseClient.Get(name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
} else {
found = true
}
if found {
lease.Spec.RenewTime = &now
_, err := leaseClient.Update(lease)
return err
}
lease = &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
},
Spec: coordinationv1.LeaseSpec{
HolderIdentity: &d.executorID,
LeaseDurationSeconds: &duration,
AcquireTime: &now,
RenewTime: &now,
},
}
lease, err = leaseClient.Create(lease)
return err
} else {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
found := false
cm, err := cmClient.Get(name, metav1.GetOptions{})
if err != nil {
if !apierrors.IsNotFound(err) {
return err
}
} else {
found = true
}
ld := &LeaseData{
LeaseDurationSeconds: duration,
AcquireTime: now,
HolderIdentity: d.executorID,
RenewTime: now,
}
if found {
if cm.Annotations == nil {
// this shouldn't happen
return errors.Errorf("missing configmap lease annotations")
}
if recordBytes, found := cm.Annotations[cmLeaseKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &ld); err != nil {
return err
}
}
ld.RenewTime = now
ldj, err := json.Marshal(ld)
if err != nil {
return err
}
cm.Annotations[cmLeaseKey] = string(ldj)
_, err = cmClient.Update(cm)
return err
}
ldj, err := json.Marshal(ld)
if err != nil {
return err
}
cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: name,
Labels: labels,
Annotations: make(map[string]string),
},
}
cm.Annotations[cmLeaseKey] = string(ldj)
cm, err = cmClient.Create(cm)
return err
}
return nil
}
func (d *K8sDriver) getLeases(ctx context.Context) ([]string, error) {
executorsIDs := []string{}
labels := map[string]string{}
labels[executorsGroupIDKey] = d.executorsGroupID
// TODO(sgotti) use go client listers instead of querying every time
if d.useLeaseAPI {
leaseClient := d.client.CoordinationV1().Leases(d.namespace)
leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
if err != nil {
return nil, err
}
for _, lease := range leases.Items {
if v, ok := lease.Labels[executorIDKey]; ok {
executorsIDs = append(executorsIDs, v)
}
}
} else {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
if err != nil {
return nil, err
}
for _, cm := range cms.Items {
if v, ok := cm.Labels[executorIDKey]; ok {
executorsIDs = append(executorsIDs, v)
}
}
}
return executorsIDs, nil
}
func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error {
labels := map[string]string{}
labels[executorsGroupIDKey] = d.executorsGroupID
// TODO(sgotti) use go client listers instead of querying every time
if d.useLeaseAPI {
leaseClient := d.client.CoordinationV1().Leases(d.namespace)
leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
if err != nil {
return err
}
for _, lease := range leases.Items {
if lease.Spec.HolderIdentity == nil {
d.log.Warnf("missing holder identity for lease %q", lease.Name)
continue
}
// skip our lease
if *lease.Spec.HolderIdentity == d.executorID {
continue
}
if lease.Spec.RenewTime == nil {
d.log.Warnf("missing renew time for lease %q", lease.Name)
continue
}
if lease.Spec.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) {
d.log.Infof("deleting stale lease %q", lease.Name)
leaseClient.Delete(lease.Name, nil)
}
}
} else {
cmClient := d.client.CoreV1().ConfigMaps(d.namespace)
cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()})
if err != nil {
return err
}
for _, cm := range cms.Items {
var ld *LeaseData
if cm.Annotations == nil {
// this shouldn't happen
d.log.Warnf("missing configmap lease annotations for configmap %q", cm.Name)
continue
}
if recordBytes, found := cm.Annotations[cmLeaseKey]; found {
if err := json.Unmarshal([]byte(recordBytes), &ld); err != nil {
return err
}
}
// skip our lease
if ld.HolderIdentity == d.executorID {
continue
}
if ld.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) {
d.log.Infof("deleting stale configmap lease %q", cm.Name)
cmClient.Delete(cm.Name, nil)
}
}
}
return nil
}

View File

@ -1297,10 +1297,22 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) {
u.Host = net.JoinHostPort(addr, port)
e.listenURL = u.String()
d, err := driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath)
var d driver.Driver
switch c.Driver.Type {
case config.DriverTypeDocker:
d, err = driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to create docker driver")
}
case config.DriverTypeK8s:
d, err = driver.NewK8sDriver(logger, e.id, c.ToolboxPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to create kubernetes driver")
}
e.dynamic = true
default:
return nil, errors.Errorf("unknown driver type %q", c.Driver.Type)
}
e.driver = d
return e, nil