372 lines
9.2 KiB
Go
372 lines
9.2 KiB
Go
// Copyright 2019 Sorint.lab
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package datamanager
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/sorintlab/agola/internal/etcd"
|
|
|
|
"github.com/pkg/errors"
|
|
etcdclientv3rpc "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
|
|
"go.etcd.io/etcd/mvcc/mvccpb"
|
|
)
|
|
|
|
// TODO(sgotti) rewrite this to use a sqlite local cache
|
|
|
|
type WalChanges struct {
|
|
actions map[string][]*Action
|
|
puts map[string]map[string]string // map[dataType]map[id]
|
|
deletes map[string]map[string]string
|
|
walSeq string
|
|
revision int64
|
|
changeGroupsRevisions changeGroupsRevisions
|
|
initialized bool
|
|
sync.Mutex
|
|
}
|
|
|
|
func NewWalChanges(dataTypes []string) *WalChanges {
|
|
changes := &WalChanges{
|
|
actions: make(map[string][]*Action),
|
|
puts: make(map[string]map[string]string),
|
|
deletes: make(map[string]map[string]string),
|
|
changeGroupsRevisions: make(changeGroupsRevisions),
|
|
}
|
|
|
|
for _, dataType := range dataTypes {
|
|
changes.puts[dataType] = make(map[string]string)
|
|
changes.deletes[dataType] = make(map[string]string)
|
|
}
|
|
|
|
return changes
|
|
}
|
|
|
|
func (c *WalChanges) String() string {
|
|
return fmt.Sprintf("puts: %s, deletes: %s, walSeq: %s, revision: %d, initialized: %t", c.puts, c.deletes, c.walSeq, c.revision, c.initialized)
|
|
}
|
|
|
|
func (c *WalChanges) curRevision() int64 {
|
|
return c.revision
|
|
}
|
|
|
|
func (c *WalChanges) putRevision(revision int64) {
|
|
c.revision = revision
|
|
}
|
|
|
|
func (c *WalChanges) curWalSeq() string {
|
|
return c.walSeq
|
|
}
|
|
|
|
func (c *WalChanges) getPut(dataType, id string) (string, bool) {
|
|
walseq, ok := c.puts[dataType][id]
|
|
return walseq, ok
|
|
}
|
|
|
|
func (c *WalChanges) getDeletesMap() map[string]struct{} {
|
|
dmap := map[string]struct{}{}
|
|
for p := range c.deletes {
|
|
dmap[p] = struct{}{}
|
|
}
|
|
return dmap
|
|
}
|
|
|
|
func (c *WalChanges) getDelete(p string) bool {
|
|
_, ok := c.deletes[p]
|
|
return ok
|
|
}
|
|
|
|
func (c *WalChanges) addPut(dataType, id, walseq string, revision int64) {
|
|
delete(c.deletes[dataType], id)
|
|
c.puts[dataType][id] = walseq
|
|
|
|
c.walSeq = walseq
|
|
c.revision = revision
|
|
}
|
|
|
|
func (c *WalChanges) removePut(dataType, id string, revision int64) {
|
|
delete(c.puts[dataType], id)
|
|
|
|
c.revision = revision
|
|
}
|
|
|
|
func (c *WalChanges) addDelete(dataType, id, walseq string, revision int64) {
|
|
delete(c.puts[dataType], id)
|
|
c.deletes[dataType][id] = walseq
|
|
|
|
c.walSeq = walseq
|
|
c.revision = revision
|
|
}
|
|
|
|
func (c *WalChanges) removeDelete(dataType, id string, revision int64) {
|
|
delete(c.deletes[dataType], id)
|
|
|
|
c.revision = revision
|
|
}
|
|
|
|
func (c *WalChanges) getChangeGroups(cgNames []string) changeGroupsRevisions {
|
|
cgr := map[string]int64{}
|
|
for _, cgName := range cgNames {
|
|
if rev, ok := c.changeGroupsRevisions[cgName]; ok {
|
|
cgr[cgName] = rev
|
|
} else {
|
|
// for non existing changegroups use a changegroup with revision = 0
|
|
cgr[cgName] = 0
|
|
}
|
|
}
|
|
|
|
return cgr
|
|
}
|
|
|
|
func (c *WalChanges) putChangeGroup(cgName string, cgRev int64) {
|
|
c.changeGroupsRevisions[cgName] = cgRev
|
|
}
|
|
|
|
func (c *WalChanges) removeChangeGroup(cgName string) {
|
|
delete(c.changeGroupsRevisions, cgName)
|
|
}
|
|
|
|
func (d *DataManager) applyWalChanges(ctx context.Context, walData *WalData, revision int64) error {
|
|
walDataFilePath := d.storageWalDataFile(walData.WalDataFileID)
|
|
|
|
walDataFile, err := d.ost.ReadObject(walDataFilePath)
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to read waldata %q", walDataFilePath)
|
|
}
|
|
defer walDataFile.Close()
|
|
dec := json.NewDecoder(walDataFile)
|
|
|
|
d.changes.Lock()
|
|
defer d.changes.Unlock()
|
|
for {
|
|
var action *Action
|
|
|
|
err := dec.Decode(&action)
|
|
if err == io.EOF {
|
|
// all done
|
|
break
|
|
}
|
|
if err != nil {
|
|
return errors.Wrapf(err, "failed to decode wal file")
|
|
}
|
|
|
|
d.applyWalChangesAction(ctx, action, walData.WalSequence, revision)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DataManager) applyWalChangesAction(ctx context.Context, action *Action, walSequence string, revision int64) {
|
|
switch action.ActionType {
|
|
case ActionTypePut:
|
|
d.changes.addPut(action.DataType, action.ID, walSequence, revision)
|
|
|
|
case ActionTypeDelete:
|
|
d.changes.addDelete(action.DataType, action.ID, walSequence, revision)
|
|
}
|
|
if d.changes.actions[walSequence] == nil {
|
|
d.changes.actions[walSequence] = []*Action{}
|
|
}
|
|
d.changes.actions[walSequence] = append(d.changes.actions[walSequence], action)
|
|
}
|
|
|
|
func (d *DataManager) watcherLoop(ctx context.Context) error {
|
|
for {
|
|
initialized := d.changes.initialized
|
|
if !initialized {
|
|
if err := d.initializeChanges(ctx); err != nil {
|
|
d.log.Errorf("watcher err: %+v", err)
|
|
}
|
|
} else {
|
|
if err := d.watcher(ctx); err != nil {
|
|
d.log.Errorf("watcher err: %+v", err)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
d.log.Infof("watcher exiting")
|
|
return nil
|
|
default:
|
|
}
|
|
|
|
time.Sleep(1 * time.Second)
|
|
}
|
|
}
|
|
|
|
func (d *DataManager) initializeChanges(ctx context.Context) error {
|
|
var revision int64
|
|
var continuation *etcd.ListPagedContinuation
|
|
for {
|
|
listResp, err := d.e.ListPaged(ctx, etcdWalsDir+"/", 0, 10, continuation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp := listResp.Resp
|
|
continuation = listResp.Continuation
|
|
|
|
revision = resp.Header.Revision
|
|
|
|
for _, kv := range resp.Kvs {
|
|
var walData *WalData
|
|
if err := json.Unmarshal(kv.Value, &walData); err != nil {
|
|
return err
|
|
}
|
|
if err := d.applyWalChanges(ctx, walData, revision); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if !listResp.HasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
continuation = nil
|
|
// use the same revision
|
|
for {
|
|
listResp, err := d.e.ListPaged(ctx, etcdChangeGroupsDir+"/", 0, 10, continuation)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
resp := listResp.Resp
|
|
continuation = listResp.Continuation
|
|
|
|
for _, kv := range resp.Kvs {
|
|
d.changes.Lock()
|
|
changeGroup := path.Base(string(kv.Key))
|
|
d.changes.putChangeGroup(changeGroup, kv.ModRevision)
|
|
d.changes.Unlock()
|
|
}
|
|
if !listResp.HasMore {
|
|
break
|
|
}
|
|
}
|
|
|
|
d.changes.Lock()
|
|
d.changes.revision = revision
|
|
d.changes.initialized = true
|
|
d.changes.Unlock()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (d *DataManager) watcher(ctx context.Context) error {
|
|
d.changes.Lock()
|
|
revision := d.changes.curRevision()
|
|
d.changes.Unlock()
|
|
|
|
wctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
wch := d.e.Watch(wctx, etcdWalBaseDir+"/", revision+1)
|
|
for wresp := range wch {
|
|
if wresp.Canceled {
|
|
err := wresp.Err()
|
|
if err == etcdclientv3rpc.ErrCompacted {
|
|
d.log.Errorf("required events already compacted, reinitializing watcher changes")
|
|
d.changes.Lock()
|
|
d.changes.initialized = false
|
|
d.changes.Unlock()
|
|
}
|
|
return errors.Wrapf(err, "watch error")
|
|
}
|
|
revision := wresp.Header.Revision
|
|
|
|
for _, ev := range wresp.Events {
|
|
key := string(ev.Kv.Key)
|
|
|
|
switch {
|
|
case strings.HasPrefix(key, etcdWalsDir+"/"):
|
|
switch ev.Type {
|
|
case mvccpb.PUT:
|
|
var walData *WalData
|
|
if err := json.Unmarshal(ev.Kv.Value, &walData); err != nil {
|
|
return err
|
|
}
|
|
if walData.WalStatus != WalStatusCommitted {
|
|
continue
|
|
}
|
|
if err := d.applyWalChanges(ctx, walData, revision); err != nil {
|
|
return err
|
|
}
|
|
case mvccpb.DELETE:
|
|
walseq := path.Base(string(key))
|
|
d.changes.Lock()
|
|
putsToDelete := map[string][]string{}
|
|
deletesToDelete := map[string][]string{}
|
|
for _, dataType := range d.dataTypes {
|
|
putsToDelete[dataType] = []string{}
|
|
deletesToDelete[dataType] = []string{}
|
|
}
|
|
for _, dataType := range d.dataTypes {
|
|
for p, pwalseq := range d.changes.puts[dataType] {
|
|
if pwalseq == walseq {
|
|
putsToDelete[dataType] = append(putsToDelete[dataType], p)
|
|
}
|
|
}
|
|
}
|
|
for _, dataType := range d.dataTypes {
|
|
for id, pwalseq := range d.changes.deletes[dataType] {
|
|
if pwalseq == walseq {
|
|
deletesToDelete[dataType] = append(deletesToDelete[dataType], id)
|
|
}
|
|
}
|
|
}
|
|
for dataType, ids := range putsToDelete {
|
|
for _, id := range ids {
|
|
d.changes.removePut(dataType, id, revision)
|
|
}
|
|
}
|
|
for dataType, ids := range putsToDelete {
|
|
for _, id := range ids {
|
|
d.changes.removeDelete(dataType, id, revision)
|
|
}
|
|
}
|
|
|
|
delete(d.changes.actions, walseq)
|
|
|
|
d.changes.Unlock()
|
|
}
|
|
|
|
case strings.HasPrefix(key, etcdChangeGroupsDir+"/"):
|
|
switch ev.Type {
|
|
case mvccpb.PUT:
|
|
d.changes.Lock()
|
|
changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/")
|
|
d.changes.putChangeGroup(changeGroup, ev.Kv.ModRevision)
|
|
d.changes.Unlock()
|
|
case mvccpb.DELETE:
|
|
d.changes.Lock()
|
|
changeGroup := strings.TrimPrefix(string(ev.Kv.Key), etcdChangeGroupsDir+"/")
|
|
d.changes.removeChangeGroup(changeGroup)
|
|
d.changes.Unlock()
|
|
}
|
|
|
|
case key == etcdPingKey:
|
|
d.changes.Lock()
|
|
d.changes.putRevision(wresp.Header.Revision)
|
|
d.changes.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|