diff --git a/cmd/agola/cmd/serve.go b/cmd/agola/cmd/serve.go index 5ba5f64..59b4538 100644 --- a/cmd/agola/cmd/serve.go +++ b/cmd/agola/cmd/serve.go @@ -25,6 +25,7 @@ import ( rsexecutor "github.com/sorintlab/agola/internal/services/executor" "github.com/sorintlab/agola/internal/services/gateway" "github.com/sorintlab/agola/internal/services/gitserver" + "github.com/sorintlab/agola/internal/services/notification" rsscheduler "github.com/sorintlab/agola/internal/services/runservice" "github.com/sorintlab/agola/internal/services/scheduler" "github.com/sorintlab/agola/internal/util" @@ -43,6 +44,7 @@ var componentsNames = []string{ "all", "gateway", "scheduler", + "notification", "runservice", "executor", "configstore", @@ -171,6 +173,14 @@ func serve(cmd *cobra.Command, args []string) error { } } + var ns *notification.NotificationService + if isComponentEnabled("notification") { + ns, err = notification.NewNotificationService(c) + if err != nil { + return errors.Wrapf(err, "failed to start notification service") + } + } + var gw *gateway.Gateway if isComponentEnabled("gateway") { gw, err = gateway.NewGateway(c) @@ -201,6 +211,9 @@ func serve(cmd *cobra.Command, args []string) error { if sched != nil { go func() { errCh <- sched.Run(ctx) }() } + if ns != nil { + go func() { errCh <- ns.Run(ctx) }() + } if gw != nil { go func() { errCh <- gw.Run(ctx) }() } diff --git a/examples/agolademo/config.yml b/examples/agolademo/config.yml index c0f21bc..93d073e 100644 --- a/examples/agolademo/config.yml +++ b/examples/agolademo/config.yml @@ -20,6 +20,13 @@ gateway: scheduler: runserviceURL: "http://localhost:4000" +notification: + webExposedURL: "http://172.17.0.1:8080" + runserviceURL: "http://localhost:4000" + configstoreURL: "http://localhost:4002" + etcd: + endpoints: "http://localhost:2379" + configstore: dataDir: /tmp/agola/configstore etcd: diff --git a/examples/kubernetes/distributed/agola.yml b/examples/kubernetes/distributed/agola.yml index bff2a82..7d27eba 100644 --- a/examples/kubernetes/distributed/agola.yml +++ b/examples/kubernetes/distributed/agola.yml @@ -11,7 +11,7 @@ spec: nodePort: 30002 selector: app: agola - component: gateway-scheduler + component: gateway-others type: NodePort --- @@ -90,6 +90,13 @@ data: scheduler: runserviceURL: "http://agola-runservice:4000" + notification: + webExposedURL: "http://192.168.39.188:30002" + runserviceURL: "http://agola-runservice:4000" + configstoreURL: "http://agola-configstore:4002" + etcd: + endpoints: "http://localhost:2379" + configstore: dataDir: /mnt/agola/local/configstore etcd: @@ -142,18 +149,18 @@ data: apiVersion: apps/v1 kind: Deployment metadata: - name: agola-gateway-scheduler + name: agola-gateway-others spec: replicas: 2 selector: matchLabels: app: agola - component: gateway-scheduler + component: gateway-others template: metadata: labels: app: agola - component: gateway-scheduler + component: gateway-others spec: containers: - name: agola @@ -164,7 +171,7 @@ spec: - "--config" - /mnt/agola/config/config.yml - "--components" - - gateway,scheduler + - gateway,scheduler,notification env: ports: - containerPort: 8000 diff --git a/examples/kubernetes/simple/agola.yml b/examples/kubernetes/simple/agola.yml index 1019dcb..e939f48 100644 --- a/examples/kubernetes/simple/agola.yml +++ b/examples/kubernetes/simple/agola.yml @@ -87,6 +87,13 @@ data: scheduler: runserviceURL: "http://agola-internal:4000" + notification: + webExposedURL: "http://192.168.39.188:30002" + runserviceURL: "http://agola-internal:4000" + configstoreURL: "http://agola-internal:4002" + etcd: + endpoints: "http://localhost:2379" + configstore: dataDir: /mnt/agola/local/configstore etcd: diff --git a/internal/services/config/config.go b/internal/services/config/config.go index 9d4e97a..8afe95d 100644 --- a/internal/services/config/config.go +++ b/internal/services/config/config.go @@ -33,12 +33,13 @@ type Config struct { // Defaults to "agola" ID string `yaml:"id"` - Gateway Gateway `yaml:"gateway"` - Scheduler Scheduler `yaml:"scheduler"` - Runservice Runservice `yaml:"runservice"` - Executor Executor `yaml:"executor"` - Configstore Configstore `yaml:"configstore"` - Gitserver Gitserver `yaml:"gitserver"` + Gateway Gateway `yaml:"gateway"` + Scheduler Scheduler `yaml:"scheduler"` + Notification Notification `yaml:"notification"` + Runservice Runservice `yaml:"runservice"` + Executor Executor `yaml:"executor"` + Configstore Configstore `yaml:"configstore"` + Gitserver Gitserver `yaml:"gitserver"` } type Gateway struct { @@ -47,7 +48,7 @@ type Gateway struct { // APIExposedURL is the gateway API exposed url i.e. https://myagola.example.com APIExposedURL string `yaml:"apiExposedURL"` - // ExposedURL is the web interface exposed url i.e. https://myagola.example.com + // WebExposedURL is the web interface exposed url i.e. https://myagola.example.com // This is used for generating the redirect_url in oauth2 redirects WebExposedURL string `yaml:"webExposedURL"` @@ -70,6 +71,19 @@ type Scheduler struct { RunserviceURL string `yaml:"runserviceURL"` } +type Notification struct { + Debug bool `yaml:"debug"` + + // WebExposedURL is the web interface exposed url i.e. https://myagola.example.com + // This is used for generating the redirect_url in oauth2 redirects + WebExposedURL string `yaml:"webExposedURL"` + + RunserviceURL string `yaml:"runserviceURL"` + ConfigstoreURL string `yaml:"configstoreURL"` + + Etcd Etcd `yaml:"etcd"` +} + type Runservice struct { Debug bool `yaml:"debug"` @@ -314,6 +328,17 @@ func Validate(c *Config) error { return errors.Errorf("scheduler runserviceURL is empty") } + // Notification + if c.Notification.WebExposedURL == "" { + return errors.Errorf("notification webExposedURL is empty") + } + if c.Notification.ConfigstoreURL == "" { + return errors.Errorf("notification configstoreURL is empty") + } + if c.Notification.RunserviceURL == "" { + return errors.Errorf("notification runserviceURL is empty") + } + // Git server if c.Gitserver.DataDir == "" { return errors.Errorf("git server dataDir is empty") diff --git a/internal/services/notification/commitstatus.go b/internal/services/notification/commitstatus.go new file mode 100644 index 0000000..486349b --- /dev/null +++ b/internal/services/notification/commitstatus.go @@ -0,0 +1,135 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package notification + +import ( + "context" + "fmt" + "net/url" + + gitsource "github.com/sorintlab/agola/internal/gitsources" + "github.com/sorintlab/agola/internal/services/common" + "github.com/sorintlab/agola/internal/services/gateway" + rstypes "github.com/sorintlab/agola/internal/services/runservice/types" + + "github.com/pkg/errors" +) + +func (n *NotificationService) updateCommitStatus(ctx context.Context, ev *rstypes.RunEvent) error { + var commitStatus gitsource.CommitStatus + if ev.Phase == rstypes.RunPhaseSetupError { + commitStatus = gitsource.CommitStatusError + } + if ev.Phase == rstypes.RunPhaseCancelled { + commitStatus = gitsource.CommitStatusError + } + if ev.Phase == rstypes.RunPhaseRunning && ev.Result == rstypes.RunResultUnknown { + commitStatus = gitsource.CommitStatusPending + } + if ev.Phase == rstypes.RunPhaseFinished && ev.Result != rstypes.RunResultUnknown { + switch ev.Result { + case rstypes.RunResultSuccess: + commitStatus = gitsource.CommitStatusSuccess + case rstypes.RunResultStopped: + fallthrough + case rstypes.RunResultFailed: + commitStatus = gitsource.CommitStatusFailed + } + } + + if commitStatus == "" { + return nil + } + + run, _, err := n.runserviceClient.GetRun(ctx, ev.RunID, nil) + if err != nil { + return err + } + groupType, groupID, err := common.GroupTypeIDFromRunGroup(run.RunConfig.Group) + if err != nil { + return err + } + + // ignore user local runs + if groupType == common.GroupTypeUser { + return nil + } + + project, _, err := n.configstoreClient.GetProject(ctx, groupID) + if err != nil { + return errors.Wrapf(err, "failed to get project %s", groupID) + } + + user, _, err := n.configstoreClient.GetUserByLinkedAccount(ctx, project.LinkedAccountID) + if err != nil { + return errors.Wrapf(err, "failed to get user by linked account %q", project.LinkedAccountID) + } + la := user.LinkedAccounts[project.LinkedAccountID] + if la == nil { + return errors.Errorf("linked account %q in user %q doesn't exist", project.LinkedAccountID, user.Name) + } + rs, _, err := n.configstoreClient.GetRemoteSource(ctx, la.RemoteSourceID) + if err != nil { + return errors.Wrapf(err, "failed to get remote source %q", la.RemoteSourceID) + } + + // TODO(sgotti) handle refreshing oauth2 tokens + gitSource, err := common.GetGitSource(rs, la) + if err != nil { + return errors.Wrapf(err, "failed to create gitea client") + } + + targetURL, err := webRunURL(n.c.WebExposedURL, project.ID, run.Run.ID) + if err != nil { + return errors.Wrapf(err, "failed to generate commit status target url") + } + description := statusDescription(commitStatus) + context := fmt.Sprintf("%s/%s/%s", n.gc.ID, project.Name, run.RunConfig.Name) + + if err := gitSource.CreateCommitStatus(project.RepositoryPath, run.Run.Annotations[gateway.AnnotationCommitSHA], commitStatus, targetURL, description, context); err != nil { + return err + } + + return nil +} + +func webRunURL(webExposedURL, projectID, runID string) (string, error) { + u, err := url.Parse(webExposedURL + "/run") + if err != nil { + return "", err + } + q := url.Values{} + q.Set("projectref", projectID) + q.Set("runid", runID) + + u.RawQuery = q.Encode() + + return u.String(), nil +} + +func statusDescription(commitStatus gitsource.CommitStatus) string { + switch commitStatus { + case gitsource.CommitStatusPending: + return "The run is pending" + case gitsource.CommitStatusSuccess: + return "The run finished successfully" + case gitsource.CommitStatusError: + return "The run encountered an error" + case gitsource.CommitStatusFailed: + return "The run failed" + default: + return "" + } +} diff --git a/internal/services/notification/notification.go b/internal/services/notification/notification.go new file mode 100644 index 0000000..43e14e8 --- /dev/null +++ b/internal/services/notification/notification.go @@ -0,0 +1,76 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package notification + +import ( + "context" + + "github.com/sorintlab/agola/internal/common" + "github.com/sorintlab/agola/internal/etcd" + slog "github.com/sorintlab/agola/internal/log" + "github.com/sorintlab/agola/internal/services/config" + csapi "github.com/sorintlab/agola/internal/services/configstore/api" + rsapi "github.com/sorintlab/agola/internal/services/runservice/api" + + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +var level = zap.NewAtomicLevelAt(zapcore.InfoLevel) +var logger = slog.New(level) +var log = logger.Sugar() + +type NotificationService struct { + gc *config.Config + c *config.Notification + + e *etcd.Store + + runserviceClient *rsapi.Client + configstoreClient *csapi.Client +} + +func NewNotificationService(gc *config.Config) (*NotificationService, error) { + c := &gc.Notification + if c.Debug { + level.SetLevel(zapcore.DebugLevel) + } + + e, err := common.NewEtcd(&c.Etcd, logger, "notification") + if err != nil { + return nil, err + } + + configstoreClient := csapi.NewClient(c.ConfigstoreURL) + runserviceClient := rsapi.NewClient(c.RunserviceURL) + + return &NotificationService{ + gc: gc, + c: c, + e: e, + runserviceClient: runserviceClient, + configstoreClient: configstoreClient, + }, nil +} + +func (n *NotificationService) Run(ctx context.Context) error { + go n.runEventsHandlerLoop(ctx) + + select { + case <-ctx.Done(): + log.Infof("notification service exiting") + return nil + } +} diff --git a/internal/services/notification/runevents.go b/internal/services/notification/runevents.go new file mode 100644 index 0000000..00bc84a --- /dev/null +++ b/internal/services/notification/runevents.go @@ -0,0 +1,122 @@ +// Copyright 2019 Sorint.lab +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied +// See the License for the specific language governing permissions and +// limitations under the License. + +package notification + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "path" + "time" + + rstypes "github.com/sorintlab/agola/internal/services/runservice/types" + + "github.com/pkg/errors" + "go.etcd.io/etcd/clientv3/concurrency" +) + +var ( + etcdRunEventsLockKey = path.Join("locks", "runevents") +) + +func (n *NotificationService) runEventsHandlerLoop(ctx context.Context) { + for { + if err := n.runEventsHandler(ctx); err != nil { + log.Errorf("err: %+v", err) + } + + select { + case <-ctx.Done(): + return + default: + } + + time.Sleep(1 * time.Second) + } +} + +func (n *NotificationService) runEventsHandler(ctx context.Context) error { + session, err := concurrency.NewSession(n.e.Client(), concurrency.WithTTL(5), concurrency.WithContext(ctx)) + if err != nil { + return err + } + defer session.Close() + + m := concurrency.NewMutex(session, etcdRunEventsLockKey) + + // TODO(sgotti) find a way to use a trylock so we'll just return if already + // locked. Currently multiple task updaters will enqueue and start when another + // finishes (unuseful and consume resources) + if err := m.Lock(ctx); err != nil { + return err + } + defer m.Unlock(ctx) + + resp, err := n.runserviceClient.GetRunEvents(ctx, "") + if err != nil { + return err + } + if resp.StatusCode != http.StatusOK { + return errors.Errorf("http status code: %d", resp.StatusCode) + } + defer resp.Body.Close() + + br := bufio.NewReader(resp.Body) + stop := false + + var buf bytes.Buffer + for { + if stop { + return nil + } + line, err := br.ReadBytes('\n') + if err != nil { + if err != io.EOF { + return err + } + if len(line) == 0 { + return nil + } + stop = true + } + switch { + case bytes.HasPrefix(line, []byte("data: ")): + buf.Write(line[6:]) + case bytes.Equal(line, []byte("\n")): + data := buf.Bytes() + log.Infof("data: %s", data) + buf.Reset() + + var ev *rstypes.RunEvent + if err := json.Unmarshal(data, &ev); err != nil { + return err + } + + // TODO(sgotti) + // this is just a basic handling. Improve it to store received events and + // their status to etcd so we can also do more logic like retrying and handle + // multiple kind of notifications (email etc...) + if err := n.updateCommitStatus(ctx, ev); err != nil { + log.Infof("failed to update commit status: %v", err) + } + + default: + return errors.Errorf("wrong data") + } + } +}