diff --git a/.agola/config.jsonnet b/.agola/config.jsonnet index f0e5272..fded205 100644 --- a/.agola/config.jsonnet +++ b/.agola/config.jsonnet @@ -34,7 +34,7 @@ local task_build_go(version, arch) = { { type: 'clone' }, { type: 'restore_cache', keys: ['cache-sum-{{ md5sum "go.sum" }}', 'cache-date-'], dest_dir: '/go/pkg/mod/cache' }, { type: 'run', command: 'make' }, - { type: 'run', command: 'SKIP_DOCKER_TESTS=1 go test -v -count 1 ./...' }, + { type: 'run', name: 'run tests', command: 'SKIP_DOCKER_TESTS=1 SKIP_K8S_TESTS=1 go test -v -count 1 ./...' }, { type: 'save_cache', key: 'cache-sum-{{ md5sum "go.sum" }}', contents: [{ source_dir: '/go/pkg/mod/cache' }] }, { type: 'save_cache', key: 'cache-date-{{ year }}-{{ month }}-{{ day }}', contents: [{ source_dir: '/go/pkg/mod/cache' }] }, ], diff --git a/examples/config.yml b/examples/config.yml index 0926eaa..23817b0 100644 --- a/examples/config.yml +++ b/examples/config.yml @@ -47,6 +47,9 @@ runServiceExecutor: runServiceURL: "http://localhost:4000" web: listenAddress: ":4001" + activeTasksLimit: 2 + driver: + type: docker gitServer: dataDir: /tmp/agola/gitserver diff --git a/go.mod b/go.mod index 283ff6a..1905538 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,10 @@ require ( github.com/docker/docker v1.13.1 github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.3.3 // indirect + github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c // indirect github.com/elazarl/go-bindata-assetfs v1.0.0 + github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f // indirect + github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f // indirect github.com/ghodss/yaml v1.0.0 github.com/go-bindata/go-bindata v1.0.0 github.com/go-ini/ini v1.42.0 // indirect @@ -21,10 +24,14 @@ require ( github.com/google/go-cmp v0.3.0 github.com/google/go-containerregistry v0.0.0-20190412005658-1d38b9cfdb9d github.com/google/go-jsonnet v0.12.1 + github.com/google/gofuzz v1.0.0 // indirect + github.com/googleapis/gnostic v0.2.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e // indirect github.com/gorilla/handlers v1.4.0 github.com/gorilla/mux v1.7.0 github.com/hashicorp/go-sockaddr v1.0.1 + github.com/imdario/mergo v0.3.7 // indirect + github.com/json-iterator/go v1.1.6 // indirect github.com/jtolds/gls v4.2.1+incompatible // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect @@ -34,6 +41,8 @@ require ( github.com/minio/minio-go v6.0.14+incompatible github.com/mitchellh/copystructure v1.0.0 github.com/mitchellh/go-homedir v1.1.0 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect github.com/opencontainers/go-digest v1.0.0-rc1 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect github.com/opencontainers/runc v0.1.1 // indirect @@ -50,9 +59,16 @@ require ( go.uber.org/zap v1.9.1 golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9 golang.org/x/oauth2 v0.0.0-20190220154721-9b3c75971fc9 - gopkg.in/ini.v1 v1.41.0 // indirect + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/ini.v1 v1.42.0 // indirect gopkg.in/yaml.v2 v2.2.2 gotest.tools v2.2.0+incompatible // indirect + k8s.io/api v0.0.0-20190313235455-40a48860b5ab + k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1 + k8s.io/client-go v11.0.0+incompatible + k8s.io/klog v0.3.0 // indirect + k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 + sigs.k8s.io/yaml v1.1.0 // indirect ) replace github.com/docker/docker v1.13.1 => github.com/docker/engine v0.0.0-20181106193140-f5749085e9cb diff --git a/go.sum b/go.sum index 742d11b..ffc0d06 100644 --- a/go.sum +++ b/go.sum @@ -35,10 +35,16 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= +github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c h1:ZfSZ3P3BedhKGUhzj7BQlPSU4OvT6tfOKe3DVHzOA7s= +github.com/docker/spdystream v0.0.0-20181023171402-6480d4af844c/go.mod h1:Qh8CwZgvJUkLughtfhJv5dyTYa91l1fOUCrgjqmcifM= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4 h1:qk/FSDDxo05wdJH28W+p5yivv7LuLYLRXPPD8KQCtZs= github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= +github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f h1:8GDPb0tCY8LQ+OJ3dbHb5sA6YZWXFORQYZx5sdsTlMs= +github.com/elazarl/goproxy v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:/Zj4wYkgs4iZTTu3o/KG3Itv/qCCa8VVMlb3i9OVuzc= +github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f h1:AUj1VoZUfhPhOPHULCQQDnGhRelpFWHMLhQVWDsS0v4= +github.com/elazarl/goproxy/ext v0.0.0-20190421051319-9d40249d3c2f/go.mod h1:gNh8nYJoAm43RfaxurUnxr+N1PwuFV3ZMl/efxlIlY8= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -70,8 +76,12 @@ github.com/google/go-jsonnet v0.12.1 h1:v0iUm/b4SBz7lR/diMoz9tLAz8lqtnNRKIwMrmU2 github.com/google/go-jsonnet v0.12.1/go.mod h1:gVu3UVSfOt5fRFq+dh9duBqXa5905QY8S1QvMNcEIVs= github.com/google/go-querystring v1.0.0 h1:Xkwi/a1rcvNg1PPYe5vI8GbeBY/jrVuDX5ASuANWTrk= github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck= +github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw= +github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.0.0 h1:b4Gk+7WdP/d3HZH8EJsZpvV7EtDOgaZLtnaNGIu1adA= github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= +github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e h1:JKmoR8x90Iww1ks85zJ1lfDGgIiMDuIptTOhJq+zKyg= github.com/gopherjs/gopherjs v0.0.0-20181103185306-d547d1d9531e/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/handlers v1.4.0 h1:XulKRWSQK5uChr4pEgSE4Tc/OcmnU9GJuSwdog/tZsA= @@ -92,10 +102,14 @@ github.com/hashicorp/go-sockaddr v1.0.1 h1:eCkkJ5KOOktDvwbsE9KPyiBWaOfp1ZNy2gLHg github.com/hashicorp/go-sockaddr v1.0.1/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A= github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= +github.com/imdario/mergo v0.3.7 h1:Y+UAYTZ7gDEuOfhxKWy+dvb5dRQ6rJjFSdX2HZY1/gI= +github.com/imdario/mergo v0.3.7/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jonboulle/clockwork v0.1.0 h1:VKV+ZcuP6l3yW9doeqz6ziZGgcynBVQO+obU0+0hcPo= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= +github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= +github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jtolds/gls v4.2.1+incompatible h1:fSuqC+Gmlu6l/ZYAoZzx2pyucC8Xza35fpRVWLVmUEE= github.com/jtolds/gls v4.2.1+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= @@ -128,6 +142,10 @@ github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrk github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo= github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY= github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= github.com/onsi/ginkgo v1.6.0 h1:Ix8l273rp3QzYgXSR+c8d1fTG7UPgYkOSELPhiY/YGw= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= @@ -152,6 +170,7 @@ github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1 h1:osmNoEW2SCW3L github.com/prometheus/common v0.0.0-20180518154759-7600349dcfe1/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be h1:MoyXp/VjXUwM0GyDcdwT7Ubea2gxOSHpPaFo3qV+Y2A= github.com/prometheus/procfs v0.0.0-20180612222113-7d6f385de8be/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rogpeppe/go-charset v0.0.0-20180617210344-2471d30d28b4/go.mod h1:qgYeAmZ5ZIpBWTGllZSQnw97Dj+woV0toclVaRGI8pc= github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/sanity-io/litter v1.1.0 h1:BllcKWa3VbZmOZbDCoszYLk7zCsKHz5Beossi8SUcTc= github.com/sanity-io/litter v1.1.0/go.mod h1:CJ0VCw2q4qKU7LaQr3n7UOSHzgEMgcGco7N/SkZQPjw= @@ -232,8 +251,10 @@ gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 h1:OAj3g0cR6Dx/R07QgQe8wkA9RNjB2u4i700xBkIT4e0= gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2/go.mod h1:Xk6kEKp8OKb+X14hQBKWaSkCsqBpgog8nAV2xsGOxlo= -gopkg.in/ini.v1 v1.41.0 h1:Ka3ViY6gNYSKiVy71zXBEqKplnV35ImDLVG+8uoIklE= -gopkg.in/ini.v1 v1.41.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/ini.v1 v1.42.0 h1:7N3gPTt50s8GuLortA00n8AqRTk75qOP98+mTPpgzRk= +gopkg.in/ini.v1 v1.42.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= @@ -241,3 +262,15 @@ gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +k8s.io/api v0.0.0-20190313235455-40a48860b5ab h1:DG9A67baNpoeweOy2spF1OWHhnVY5KR7/Ek/+U1lVZc= +k8s.io/api v0.0.0-20190313235455-40a48860b5ab/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1 h1:IS7K02iBkQXpCeieSiyJjGoLSdVOv2DbPaWHJ+ZtgKg= +k8s.io/apimachinery v0.0.0-20190313205120-d7deff9243b1/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v11.0.0+incompatible h1:LBbX2+lOwY9flffWlJM7f1Ct8V2SRNiMRDFeiwnJo9o= +k8s.io/client-go v11.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +k8s.io/klog v0.3.0 h1:0VPpR+sizsiivjIfIAQH/rl8tan6jvWkS7lU+0di3lE= +k8s.io/klog v0.3.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7 h1:8r+l4bNWjRlsFYlQJnKJ2p7s1YQPj4XyXiJVqDHRx7c= +k8s.io/utils v0.0.0-20190308190857-21c4ce38f2a7/go.mod h1:8k8uAuAQ0rXslZKaEWd0c3oVhZz7sSzSiPnVZayjIX0= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 6c2d3f5..b10bb57 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -81,6 +81,8 @@ type RunServiceExecutor struct { Web Web `yaml:"web"` + Driver Driver `yaml:"driver"` + Labels map[string]string `yaml:"labels"` // ActiveTasksLimit is the max number of concurrent active tasks ActiveTasksLimit int `yaml:"active_tasks_limit"` @@ -160,6 +162,22 @@ type Etcd struct { TLSSkipVerify bool `yaml:"tlsSkipVerify"` } +type DriverType string + +const ( + DriverTypeDocker DriverType = "docker" + DriverTypeK8s DriverType = "kubernetes" +) + +type Driver struct { + Type DriverType `yaml:"type"` + + // docker fields + + // k8s fields + +} + type TokenSigning struct { // token duration (defaults to 12 hours) Duration time.Duration `yaml:"duration"` @@ -262,6 +280,15 @@ func Validate(c *Config) error { if c.RunServiceExecutor.RunServiceURL == "" { return errors.Errorf("runservice executor runServiceURL is empty") } + if c.RunServiceExecutor.Driver.Type == "" { + return errors.Errorf("runservice executor driver type is empty") + } + switch c.RunServiceExecutor.Driver.Type { + case DriverTypeDocker: + case DriverTypeK8s: + default: + return errors.Errorf("runservice executor driver type %q unknown", c.RunServiceExecutor.Driver.Type) + } // Scheduler if c.Scheduler.RunServiceURL == "" { diff --git a/internal/services/runservice/executor/driver/k8s.go b/internal/services/runservice/executor/driver/k8s.go new file mode 100644 index 0000000..e03a05e --- /dev/null +++ b/internal/services/runservice/executor/driver/k8s.go @@ -0,0 +1,629 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package driver + +import ( + "context" + "encoding/json" + "fmt" + "io" + "path/filepath" + "strings" + "time" + + "github.com/sorintlab/agola/internal/util" + + "github.com/docker/docker/client" + "github.com/docker/docker/pkg/archive" + "github.com/pkg/errors" + uuid "github.com/satori/go.uuid" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apilabels "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + restclient "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" + "k8s.io/client-go/tools/remotecommand" + utilexec "k8s.io/utils/exec" +) + +const ( + mainContainerName = "maincontainer" + + configMapName = "agola-executors-group" + executorLeasePrefix = "agola-executor-" + podNamePrefix = "agola-task-" + + executorsGroupIDKey = labelPrefix + "executorsgroupid" + executorsGroupIDConfigMapKey = "executorsgroupid" + useLeaseAPIKey = labelPrefix + "useleaseapi" + cmLeaseKey = labelPrefix + "lease" + + renewExecutorLeaseInterval = 10 * time.Second + staleExecutorLeaseInterval = 1 * time.Minute +) + +type K8sDriver struct { + log *zap.SugaredLogger + restconfig *restclient.Config + client *kubernetes.Clientset + toolboxPath string + namespace string + executorID string + executorsGroupID string + useLeaseAPI bool +} + +type K8sPod struct { + id string + namespace string + labels map[string]string + + restconfig *restclient.Config + client *kubernetes.Clientset + initVolumeDir string +} + +func NewK8sDriver(logger *zap.Logger, executorID, toolboxPath string) (*K8sDriver, error) { + kubeClientConfig := NewKubeClientConfig("", "", "") + kubecfg, err := kubeClientConfig.ClientConfig() + if err != nil { + return nil, err + } + kubecli, err := kubernetes.NewForConfig(kubecfg) + if err != nil { + return nil, fmt.Errorf("cannot create kubernetes client: %v", err) + } + + namespace, _, err := kubeClientConfig.Namespace() + if err != nil { + return nil, err + } + + d := &K8sDriver{ + log: logger.Sugar(), + restconfig: kubecfg, + client: kubecli, + toolboxPath: toolboxPath, + namespace: namespace, + executorID: executorID, + } + + lists, err := d.client.Discovery().ServerPreferredResources() + if err != nil { + return nil, err + } + + hasLeaseAPI := false + for _, list := range lists { + if len(list.APIResources) == 0 { + continue + } + if list.GroupVersion != "coordination.k8s.io/v1" { + continue + } + for _, apiResource := range list.APIResources { + if apiResource.Kind == "Lease" { + hasLeaseAPI = true + } + } + } + d.useLeaseAPI = hasLeaseAPI + + executorsGroupID, err := d.getOrCreateExecutorsGroupID(context.TODO()) + if err != nil { + return nil, err + } + + d.executorsGroupID = executorsGroupID + + ctx := context.TODO() + go func() { + for { + if err := d.updateLease(ctx); err != nil { + d.log.Errorf("failed to update executor lease: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(renewExecutorLeaseInterval) + } + }() + + go func() { + for { + if err := d.cleanStaleExecutorsLease(ctx); err != nil { + d.log.Errorf("failed to clean stale executors lease: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(renewExecutorLeaseInterval) + } + }() + + return d, nil +} + +// NewKubeClientConfig return a kube client config that will by default use an +// in cluster client config or, if not available or overriden an external client +// config using the default client behavior used also by kubectl. +func NewKubeClientConfig(kubeconfigPath, context, namespace string) clientcmd.ClientConfig { + rules := clientcmd.NewDefaultClientConfigLoadingRules() + rules.DefaultClientConfig = &clientcmd.DefaultClientConfig + + if kubeconfigPath != "" { + rules.ExplicitPath = kubeconfigPath + } + + overrides := &clientcmd.ConfigOverrides{ClusterDefaults: clientcmd.ClusterDefaults} + + if context != "" { + overrides.CurrentContext = context + } + + if namespace != "" { + overrides.Context.Namespace = namespace + } + + return clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides) +} + +func (d *K8sDriver) Setup(ctx context.Context) error { + return nil +} + +func (d *K8sDriver) ExecutorGroup(ctx context.Context) (string, error) { + return d.executorsGroupID, nil +} + +func (d *K8sDriver) GetExecutors(ctx context.Context) ([]string, error) { + return d.getLeases((ctx)) +} + +// executorsGroups gets or creates (if it doesn't exists) a configmap under +// the k8s namespace where the executorsgroup id is saved. The executorsgroupid +// is unique per k8s namespace and is shared by all the executors accessing this +// namespace +func (d *K8sDriver) getOrCreateExecutorsGroupID(ctx context.Context) (string, error) { + cmClient := d.client.CoreV1().ConfigMaps(d.namespace) + + // pod and secret name, based on pod id + cm, err := cmClient.Get(configMapName, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return "", err + } + } else { + return cm.Data[executorsGroupIDConfigMapKey], nil + } + + executorsGroupID := uuid.NewV4().String() + + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: configMapName, + }, + Data: map[string]string{executorsGroupIDConfigMapKey: executorsGroupID}, + } + cm, err = cmClient.Create(cm) + if err != nil { + return "", err + } + + return executorsGroupID, nil +} + +func (d *K8sDriver) NewPod(ctx context.Context, podConfig *PodConfig, out io.Writer) (Pod, error) { + if len(podConfig.Containers) == 0 { + return nil, errors.Errorf("empty container config") + } + + containerConfig := podConfig.Containers[0] + + secretClient := d.client.CoreV1().Secrets(d.namespace) + podClient := d.client.CoreV1().Pods(d.namespace) + + labels := map[string]string{} + labels[agolaLabelKey] = agolaLabelValue + labels[podIDKey] = podConfig.ID + labels[taskIDKey] = podConfig.TaskID + labels[executorIDKey] = d.executorID + labels[executorsGroupIDKey] = d.executorsGroupID + + dockerconfigj, err := json.Marshal(podConfig.DockerConfig) + if err != nil { + return nil, err + } + + // pod and secret name, based on pod id + name := podNamePrefix + podConfig.ID + + // secret that hold the docker registry auth + secret := &corev1.Secret{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + }, + Data: map[string][]byte{ + ".dockerconfigjson": dockerconfigj, + }, + Type: corev1.SecretTypeDockerConfigJson, + } + + secret, err = secretClient.Create(secret) + if err != nil { + return nil, err + } + + pod := &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: d.namespace, + Name: name, + Labels: labels, + }, + Spec: corev1.PodSpec{ + ImagePullSecrets: []corev1.LocalObjectReference{{Name: name}}, + // don't mount service account secrets or pods will be able to talk with k8s + // api + AutomountServiceAccountToken: util.BoolP(false), + InitContainers: []corev1.Container{ + { + Name: "initcontainer", + Image: "busybox", + // wait for a file named /tmp/done and then exit + Command: []string{"/bin/sh", "-c", "while true; do if [[ -f /tmp/done ]]; then exit; fi; sleep 1; done"}, + Stdin: true, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "agolavolume", + MountPath: podConfig.InitVolumeDir, + }, + }, + }, + }, + Containers: []corev1.Container{ + { + Name: mainContainerName, + Image: containerConfig.Image, + Command: containerConfig.Cmd[0:1], + Args: containerConfig.Cmd[1:], + Env: genEnvVars(containerConfig.Env), + Stdin: true, + WorkingDir: containerConfig.WorkingDir, + VolumeMounts: []corev1.VolumeMount{ + { + Name: "agolavolume", + MountPath: podConfig.InitVolumeDir, + ReadOnly: true, + }, + }, + // by default always try to pull the image so we are sure only authorized users can fetch them + // see https://kubernetes.io/docs/reference/access-authn-authz/admission-controllers/#alwayspullimages + ImagePullPolicy: corev1.PullAlways, + SecurityContext: &corev1.SecurityContext{ + Privileged: &containerConfig.Privileged, + }, + }, + }, + Volumes: []corev1.Volume{ + { + Name: "agolavolume", + VolumeSource: corev1.VolumeSource{ + EmptyDir: &corev1.EmptyDirVolumeSource{}, + }, + }, + }, + }, + } + + pod, err = podClient.Create(pod) + if err != nil { + return nil, err + } + + watcher, err := podClient.Watch( + metav1.SingleObject(pod.ObjectMeta), + ) + if err != nil { + return nil, err + } + + // wait for init container to be ready + for event := range watcher.ResultChan() { + switch event.Type { + case watch.Modified: + pod := event.Object.(*corev1.Pod) + if len(pod.Status.InitContainerStatuses) > 0 { + if pod.Status.InitContainerStatuses[0].State.Running != nil { + watcher.Stop() + } + } + case watch.Deleted: + return nil, errors.Errorf("pod %q has been deleted", pod.Name) + } + } + + fmt.Fprintf(out, "init container ready\n") + + srcInfo, err := archive.CopyInfoSourcePath(d.toolboxPath, false) + if err != nil { + return nil, err + } + + srcArchive, err := archive.TarResource(srcInfo) + if err != nil { + return nil, err + } + defer srcArchive.Close() + + coreclient, err := corev1client.NewForConfig(d.restconfig) + if err != nil { + return nil, err + } + + req := coreclient.RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: "initcontainer", + Command: []string{"tar", "xf", "-", "-C", podConfig.InitVolumeDir}, + Stdin: true, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL()) + if err != nil { + return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL()) + } + + err = exec.Stream(remotecommand.StreamOptions{ + Stdin: srcArchive, + Stdout: out, + Stderr: out, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to execute command on initcontainer") + } + + req = coreclient.RESTClient(). + Post(). + Namespace(pod.Namespace). + Resource("pods"). + Name(pod.Name). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: "initcontainer", + Command: []string{"touch", "/tmp/done"}, + Stdout: true, + Stderr: true, + TTY: false, + }, scheme.ParameterCodec) + + exec, err = remotecommand.NewSPDYExecutor(d.restconfig, "POST", req.URL()) + if err != nil { + return nil, errors.Wrapf(err, "failed to generate k8s client spdy executor for url %q, method: POST", req.URL()) + } + + err = exec.Stream(remotecommand.StreamOptions{ + Stdout: out, + Stderr: out, + }) + if err != nil { + return nil, errors.Wrapf(err, "failed to execute command on initcontainer") + } + + watcher, err = podClient.Watch( + metav1.SingleObject(pod.ObjectMeta), + ) + if err != nil { + return nil, err + } + + // wait for pod to be initialized + for event := range watcher.ResultChan() { + switch event.Type { + case watch.Modified: + pod := event.Object.(*corev1.Pod) + if len(pod.Status.ContainerStatuses) > 0 { + if pod.Status.ContainerStatuses[0].State.Running != nil { + watcher.Stop() + } + } + case watch.Deleted: + return nil, errors.Errorf("pod %q has been deleted", pod.Name) + } + } + + return &K8sPod{ + id: pod.Name, + namespace: pod.Namespace, + + restconfig: d.restconfig, + client: d.client, + initVolumeDir: podConfig.InitVolumeDir, + }, nil +} + +func (d *K8sDriver) GetPods(ctx context.Context, all bool) ([]Pod, error) { + podClient := d.client.CoreV1().Pods(d.namespace) + + // get all pods for the executor group, also the ones managed by other executors in the same executor group + labels := map[string]string{executorsGroupIDKey: d.executorsGroupID} + + // TODO(sgotti) use go client listers instead of querying every time + k8sPods, err := podClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + if err != nil { + return nil, err + } + + pods := make([]Pod, len(k8sPods.Items)) + for i, k8sPod := range k8sPods.Items { + labels := map[string]string{} + // keep only labels starting with our prefix + for n, v := range k8sPod.Labels { + if strings.HasPrefix(n, labelPrefix) { + labels[n] = v + } + } + pods[i] = &K8sPod{ + id: k8sPod.Name, + namespace: k8sPod.Namespace, + labels: labels, + + restconfig: d.restconfig, + client: d.client, + } + } + return pods, nil +} + +func (p *K8sPod) ID() string { + return p.id +} + +func (p *K8sPod) ExecutorID() string { + return p.labels[executorIDKey] +} + +func (p *K8sPod) TaskID() string { + return p.labels[taskIDKey] +} + +func (p *K8sPod) Stop(ctx context.Context) error { + d := int64(0) + secretClient := p.client.CoreV1().Secrets(p.namespace) + if err := secretClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil { + return err + } + podClient := p.client.CoreV1().Pods(p.namespace) + if err := podClient.Delete(p.id, &metav1.DeleteOptions{GracePeriodSeconds: &d}); err != nil { + return err + } + return nil +} + +func (p *K8sPod) Remove(ctx context.Context) error { + return p.Stop(ctx) +} + +type K8sContainerExec struct { + execID string + client *client.Client + endCh chan error + + stdin io.WriteCloser +} + +func (p *K8sPod) Exec(ctx context.Context, execConfig *ExecConfig) (ContainerExec, error) { + endCh := make(chan error) + + coreclient, err := corev1client.NewForConfig(p.restconfig) + if err != nil { + return nil, err + } + + // k8s pod exec api doesn't let us define the workingdir and the environment. + // Use a toolbox command that will set them up and then exec the real command. + envj, err := json.Marshal(execConfig.Env) + if err != nil { + return nil, err + } + cmd := []string{filepath.Join(p.initVolumeDir, "agola-toolbox"), "exec", "-e", string(envj), "-w", execConfig.WorkingDir, "--"} + cmd = append(cmd, execConfig.Cmd...) + + req := coreclient.RESTClient(). + Post(). + Namespace(p.namespace). + Resource("pods"). + Name(p.id). + SubResource("exec"). + VersionedParams(&corev1.PodExecOptions{ + Container: mainContainerName, + Command: cmd, + Stdin: true, + Stdout: execConfig.Stdout != nil, + Stderr: execConfig.Stderr != nil, + TTY: execConfig.Tty, + }, scheme.ParameterCodec) + + exec, err := remotecommand.NewSPDYExecutor(p.restconfig, "POST", req.URL()) + if err != nil { + return nil, err + } + + reader, writer := io.Pipe() + + go func() { + err := exec.Stream(remotecommand.StreamOptions{ + Stdin: reader, + Stdout: execConfig.Stdout, + Stderr: execConfig.Stderr, + Tty: execConfig.Tty, + }) + endCh <- err + }() + + return &K8sContainerExec{ + stdin: writer, + endCh: endCh, + }, nil +} + +func (e *K8sContainerExec) Wait(ctx context.Context) (int, error) { + err := <-e.endCh + + var exitCode int + if err != nil { + switch err := err.(type) { + case utilexec.ExitError: + exitCode = err.ExitStatus() + default: + return -1, err + } + } + + return exitCode, nil +} + +func (e *K8sContainerExec) Stdin() io.WriteCloser { + return e.stdin +} + +func genEnvVars(env map[string]string) []corev1.EnvVar { + envVars := make([]corev1.EnvVar, 0, len(env)) + for n, v := range env { + envVars = append(envVars, corev1.EnvVar{Name: n, Value: v}) + } + return envVars +} diff --git a/internal/services/runservice/executor/driver/k8s_test.go b/internal/services/runservice/executor/driver/k8s_test.go new file mode 100644 index 0000000..1f91c83 --- /dev/null +++ b/internal/services/runservice/executor/driver/k8s_test.go @@ -0,0 +1,192 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package driver + +import ( + "bytes" + "context" + "io/ioutil" + "os" + "testing" + + uuid "github.com/satori/go.uuid" +) + +func TestK8sPod(t *testing.T) { + if os.Getenv("SKIP_K8S_TESTS") == "1" { + t.Skip("skipping since env var SKIP_K8S_TESTS is 1") + } + toolboxPath := os.Getenv("AGOLA_TOOLBOX_PATH") + if toolboxPath == "" { + t.Fatalf("env var AGOLA_TOOLBOX_PATH is undefined") + } + + dir, err := ioutil.TempDir("", "agola") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + + d, err := NewK8sDriver(logger, "executorid01", toolboxPath) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ctx := context.Background() + + t.Run("create a pod with one container", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + }) + + t.Run("execute a command inside a pod", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"ls"}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ce.Stdin().Close() + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + }) + + t.Run("test pod environment", func(t *testing.T) { + env := map[string]string{ + "ENV01": "ENVVALUE01", + "ENV02": "ENVVALUE02", + } + + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + Env: env, + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + var buf bytes.Buffer + ce, err := pod.Exec(ctx, &ExecConfig{ + Cmd: []string{"env"}, + Stdout: &buf, + Stderr: os.Stdout, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ce.Stdin().Close() + code, err := ce.Wait(ctx) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if code != 0 { + t.Fatalf("unexpected exit code: %d", code) + } + + curEnv, err := parseEnvs(bytes.NewReader(buf.Bytes())) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + for n, e := range env { + if ce, ok := curEnv[n]; !ok { + t.Fatalf("missing env var %s", n) + } else { + if ce != e { + t.Fatalf("different env var %s value, want: %q, got %q", n, e, ce) + } + } + } + }) + + t.Run("test get pods", func(t *testing.T) { + pod, err := d.NewPod(ctx, &PodConfig{ + ID: uuid.NewV4().String(), + TaskID: uuid.NewV4().String(), + Containers: []*ContainerConfig{ + &ContainerConfig{ + Cmd: []string{"cat"}, + Image: "busybox", + }, + }, + InitVolumeDir: "/tmp/agola", + }, ioutil.Discard) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer pod.Remove(ctx) + + pods, err := d.GetPods(ctx, true) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + ok := false + for _, p := range pods { + if p.ID() == pod.ID() { + ok = true + } + } + if !ok { + t.Fatalf("pod with id %q not found", pod.ID()) + } + }) + +} diff --git a/internal/services/runservice/executor/driver/k8slease.go b/internal/services/runservice/executor/driver/k8slease.go new file mode 100644 index 0000000..4302d80 --- /dev/null +++ b/internal/services/runservice/executor/driver/k8slease.go @@ -0,0 +1,233 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package driver + +import ( + "context" + "encoding/json" + "time" + + "github.com/pkg/errors" + coordinationv1 "k8s.io/api/coordination/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + apilabels "k8s.io/apimachinery/pkg/labels" +) + +type LeaseData struct { + LeaseDurationSeconds int `json:"leaseDurationSeconds"` + AcquireTime time.Time `json:"acquireTime"` + RenewTime time.Time `json:"renewTime"` + HolderIdentity string `json:"holderIdentity"` +} + +func (d *K8sDriver) updateLease(ctx context.Context) error { + duration := int(staleExecutorLeaseInterval / time.Second) + now := time.Now() + + name := executorLeasePrefix + d.executorID + labels := map[string]string{} + labels[executorsGroupIDKey] = d.executorsGroupID + labels[executorIDKey] = d.executorID + + if d.useLeaseAPI { + duration := int32(duration) + now := metav1.MicroTime{now} + + leaseClient := d.client.CoordinationV1().Leases(d.namespace) + found := false + lease, err := leaseClient.Get(name, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + } else { + found = true + } + + if found { + lease.Spec.RenewTime = &now + _, err := leaseClient.Update(lease) + return err + } + + lease = &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: &d.executorID, + LeaseDurationSeconds: &duration, + AcquireTime: &now, + RenewTime: &now, + }, + } + lease, err = leaseClient.Create(lease) + return err + } else { + cmClient := d.client.CoreV1().ConfigMaps(d.namespace) + found := false + cm, err := cmClient.Get(name, metav1.GetOptions{}) + if err != nil { + if !apierrors.IsNotFound(err) { + return err + } + } else { + found = true + } + + ld := &LeaseData{ + LeaseDurationSeconds: duration, + AcquireTime: now, + HolderIdentity: d.executorID, + RenewTime: now, + } + if found { + if cm.Annotations == nil { + // this shouldn't happen + return errors.Errorf("missing configmap lease annotations") + } + if recordBytes, found := cm.Annotations[cmLeaseKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &ld); err != nil { + return err + } + } + ld.RenewTime = now + ldj, err := json.Marshal(ld) + if err != nil { + return err + } + cm.Annotations[cmLeaseKey] = string(ldj) + _, err = cmClient.Update(cm) + return err + } + + ldj, err := json.Marshal(ld) + if err != nil { + return err + } + cm = &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Labels: labels, + Annotations: make(map[string]string), + }, + } + cm.Annotations[cmLeaseKey] = string(ldj) + cm, err = cmClient.Create(cm) + return err + } + return nil +} + +func (d *K8sDriver) getLeases(ctx context.Context) ([]string, error) { + executorsIDs := []string{} + + labels := map[string]string{} + labels[executorsGroupIDKey] = d.executorsGroupID + + // TODO(sgotti) use go client listers instead of querying every time + if d.useLeaseAPI { + leaseClient := d.client.CoordinationV1().Leases(d.namespace) + + leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + if err != nil { + return nil, err + } + for _, lease := range leases.Items { + if v, ok := lease.Labels[executorIDKey]; ok { + executorsIDs = append(executorsIDs, v) + } + } + } else { + cmClient := d.client.CoreV1().ConfigMaps(d.namespace) + + cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + if err != nil { + return nil, err + } + for _, cm := range cms.Items { + if v, ok := cm.Labels[executorIDKey]; ok { + executorsIDs = append(executorsIDs, v) + } + } + } + + return executorsIDs, nil +} + +func (d *K8sDriver) cleanStaleExecutorsLease(ctx context.Context) error { + labels := map[string]string{} + labels[executorsGroupIDKey] = d.executorsGroupID + + // TODO(sgotti) use go client listers instead of querying every time + if d.useLeaseAPI { + leaseClient := d.client.CoordinationV1().Leases(d.namespace) + + leases, err := leaseClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + if err != nil { + return err + } + for _, lease := range leases.Items { + if lease.Spec.HolderIdentity == nil { + d.log.Warnf("missing holder identity for lease %q", lease.Name) + continue + } + // skip our lease + if *lease.Spec.HolderIdentity == d.executorID { + continue + } + if lease.Spec.RenewTime == nil { + d.log.Warnf("missing renew time for lease %q", lease.Name) + continue + } + if lease.Spec.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) { + d.log.Infof("deleting stale lease %q", lease.Name) + leaseClient.Delete(lease.Name, nil) + } + } + } else { + cmClient := d.client.CoreV1().ConfigMaps(d.namespace) + + cms, err := cmClient.List(metav1.ListOptions{LabelSelector: apilabels.SelectorFromSet(labels).String()}) + if err != nil { + return err + } + for _, cm := range cms.Items { + var ld *LeaseData + if cm.Annotations == nil { + // this shouldn't happen + d.log.Warnf("missing configmap lease annotations for configmap %q", cm.Name) + continue + } + if recordBytes, found := cm.Annotations[cmLeaseKey]; found { + if err := json.Unmarshal([]byte(recordBytes), &ld); err != nil { + return err + } + } + // skip our lease + if ld.HolderIdentity == d.executorID { + continue + } + if ld.RenewTime.Add(staleExecutorLeaseInterval).Before(time.Now()) { + d.log.Infof("deleting stale configmap lease %q", cm.Name) + cmClient.Delete(cm.Name, nil) + } + } + } + return nil +} diff --git a/internal/services/runservice/executor/executor.go b/internal/services/runservice/executor/executor.go index c4cd718..7d4767c 100644 --- a/internal/services/runservice/executor/executor.go +++ b/internal/services/runservice/executor/executor.go @@ -1297,9 +1297,21 @@ func NewExecutor(c *config.RunServiceExecutor) (*Executor, error) { u.Host = net.JoinHostPort(addr, port) e.listenURL = u.String() - d, err := driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath) - if err != nil { - return nil, errors.Wrapf(err, "failed to create docker driver") + var d driver.Driver + switch c.Driver.Type { + case config.DriverTypeDocker: + d, err = driver.NewDockerDriver(logger, e.id, "/tmp/agola/bin", e.c.ToolboxPath) + if err != nil { + return nil, errors.Wrapf(err, "failed to create docker driver") + } + case config.DriverTypeK8s: + d, err = driver.NewK8sDriver(logger, e.id, c.ToolboxPath) + if err != nil { + return nil, errors.Wrapf(err, "failed to create kubernetes driver") + } + e.dynamic = true + default: + return nil, errors.Errorf("unknown driver type %q", c.Driver.Type) } e.driver = d