gosora/common/tickloop.go

366 lines
9.8 KiB
Go

package common
import (
"fmt"
"log"
"strconv"
"strings"
"sync/atomic"
"time"
qgen "git.tuxpa.in/a/gosora/query_gen"
"git.tuxpa.in/a/gosora/uutils"
"github.com/pkg/errors"
)
var CTickLoop *TickLoop
type TickLoop struct {
HalfSec *time.Ticker
Sec *time.Ticker
FifteenMin *time.Ticker
Hour *time.Ticker
Day *time.Ticker
HalfSecf func() error
Secf func() error
FifteenMinf func() error
Hourf func() error
Dayf func() error
}
func NewTickLoop() *TickLoop {
return &TickLoop{
// TODO: Write tests for these
// Run this goroutine once every half second
HalfSec: time.NewTicker(time.Second / 2),
Sec: time.NewTicker(time.Second),
FifteenMin: time.NewTicker(15 * time.Minute),
Hour: time.NewTicker(time.Hour),
Day: time.NewTicker(time.Hour * 24),
}
}
func (l *TickLoop) Loop() {
r := func(e error) {
if e != nil {
LogError(e)
}
}
defer EatPanics()
for {
select {
case <-l.HalfSec.C:
r(l.HalfSecf())
case <-l.Sec.C:
r(l.Secf())
case <-l.FifteenMin.C:
r(l.FifteenMinf())
case <-l.Hour.C:
r(l.Hourf())
// TODO: Handle the instance going down a lot better
case <-l.Day.C:
r(l.Dayf())
}
}
}
var ErrDBDown = errors.New("The database is down")
func DBTimeout() time.Duration {
if Dev.HourDBTimeout {
return time.Hour
}
//db.SetConnMaxLifetime(time.Second * 60 * 5) // Make this infinite as the temporary lifetime change will purge the stale connections?
return -1
}
var pint int64
func StartTick() (abort bool) {
opint := pint
db := qgen.Builder.GetConn()
isDBDown := atomic.LoadInt32(&IsDBDown)
if e := db.Ping(); e != nil {
// TODO: There's a bit of a race here, but it doesn't matter if this error appears multiple times in the logs as it's capped at three times, we just want to cut it down 99% of the time
if isDBDown == 0 {
db.SetConnMaxLifetime(time.Second / 4) // Drop all the connections and start over
LogWarning(e, ErrDBDown.Error())
}
atomic.StoreInt32(&IsDBDown, 1)
return true
}
if isDBDown == 1 {
log.Print("The database is back")
}
db.SetConnMaxLifetime(DBTimeout())
atomic.StoreInt32(&IsDBDown, 0)
return opint != pint
}
// TODO: Move these into DailyTick() methods?
func asmMatches() error {
// TODO: Find a more efficient way of doing this
return qgen.NewAcc().Select("activity_stream").Cols("asid").EachInt(func(asid int) error {
if ActivityMatches.CountAsid(asid) > 0 {
return nil
}
return Activity.Delete(asid)
})
}
// TODO: Name the tasks so we can figure out which one it was when something goes wrong? Or maybe toss it up WithStack down there?
func RunTasks(tasks []func() error) error {
for _, task := range tasks {
if e := task(); e != nil {
return e
}
}
return nil
}
/*func init() {
DbInits.Add(func(acc *qgen.Accumulator) error {
replyStmts = ReplyStmts{
isLiked: acc.Select("likes").Columns("targetItem").Where("sentBy=? and targetItem=? and targetType='replies'").Prepare(),
}
return acc.FirstError()
})
}*/
func StartupTasks() (e error) {
r := func(ee error) {
if e == nil {
e = ee
}
}
if Config.DisableRegLog {
r(RegLogs.Purge())
}
if Config.DisableLoginLog {
r(LoginLogs.Purge())
}
if Config.DisablePostIP {
// TODO: Clear the caches?
r(Topics.ClearIPs())
r(Rstore.ClearIPs())
r(Prstore.ClearIPs())
}
if Config.DisablePollIP {
r(Polls.ClearIPs())
}
if Config.DisableLastIP {
r(Users.ClearLastIPs())
}
return e
}
func Dailies() (e error) {
if e = asmMatches(); e != nil {
return e
}
newAcc := func() *qgen.Accumulator {
return qgen.NewAcc()
}
exec := func(ac qgen.AccExec) {
if e != nil {
return
}
_, ee := ac.Exec()
e = ee
}
r := func(ee error) {
if e == nil {
e = ee
}
}
if Config.LogPruneCutoff > -1 {
// TODO: Clear the caches?
if !Config.DisableLoginLog {
r(LoginLogs.DeleteOlderThanDays(Config.LogPruneCutoff))
}
if !Config.DisableRegLog {
r(RegLogs.DeleteOlderThanDays(Config.LogPruneCutoff))
}
}
if !Config.DisablePostIP && Config.PostIPCutoff > -1 {
// TODO: Use unixtime to remove this MySQLesque logic?
f := func(tbl string) {
exec(newAcc().Update(tbl).Set("ip=''").DateOlderThan("createdAt", Config.PostIPCutoff, "day").Where("ip!=''"))
}
f("topics")
f("replies")
f("users_replies")
}
if !Config.DisablePollIP && Config.PollIPCutoff > -1 {
// TODO: Use unixtime to remove this MySQLesque logic?
exec(newAcc().Update("polls_votes").Set("ip=''").DateOlderThan("castAt", Config.PollIPCutoff, "day").Where("ip!=''"))
// TODO: Find some way of purging the ip data in polls_votes without breaking any anti-cheat measures which might be running... maybe hash it instead?
}
// TODO: lastActiveAt isn't currently set, so we can't rely on this to purge last_ips of users who haven't been on in a while
if !Config.DisableLastIP && Config.LastIPCutoff > 0 {
//exec(newAcc().Update("users").Set("last_ip='0'").DateOlderThan("lastActiveAt",c.Config.PostIPCutoff,"day").Where("last_ip!='0'"))
mon := time.Now().Month()
exec(newAcc().Update("users").Set("last_ip=''").Where("last_ip!='' AND last_ip NOT LIKE '" + strconv.Itoa(int(mon)) + "-%'"))
}
if e != nil {
return e
}
if e = Tasks.Day.Run(); e != nil {
return e
}
if e = ForumActionStore.DailyTick(); e != nil {
return e
}
{
e := Meta.SetInt64("lastDaily", time.Now().Unix())
if e != nil {
return e
}
}
return nil
}
type TickWatch struct {
Name string
Start int64
DBCheck int64
StartHook int64
Tasks int64
EndHook int64
Ticker *time.Ticker
Deadline *time.Ticker
EndChan chan bool
OutEndChan chan bool
}
func NewTickWatch() *TickWatch {
return &TickWatch{
Ticker: time.NewTicker(time.Second * 5),
Deadline: time.NewTicker(time.Hour),
}
}
func (w *TickWatch) DumpElapsed() {
var sb strings.Builder
f := func(str string) {
sb.WriteString(str)
}
ff := func(str string, args ...interface{}) {
f(fmt.Sprintf(str, args...))
}
elapse := func(name string, bef, v int64) {
if bef == 0 || v == 0 {
ff("%s: %d\n", v)
return
}
dur := time.Duration(v - bef)
milli := dur.Milliseconds()
if milli < 1000 {
ff("%s: %d - %d ms\n", name, v, milli)
} else if milli > 60000 {
secs := milli / 1000
mins := secs / 60
ff("%s: %d - m%d s%.2f\n", name, v, mins, float64(milli-(mins*60*1000))/1000)
} else {
ff("%s: %d - %.2f secs\n", name, v, dur.Seconds())
}
}
f("Name: " + w.Name + "\n")
ff("Start: %d\n", w.Start)
elapse("DBCheck", w.Start, w.DBCheck)
if w.DBCheck == 0 {
Log(sb.String())
return
}
elapse("StartHook", w.DBCheck, w.StartHook)
if w.StartHook == 0 {
Log(sb.String())
return
}
elapse("Tasks", w.StartHook, w.Tasks)
if w.Tasks == 0 {
Log(sb.String())
return
}
elapse("EndHook", w.Tasks, w.EndHook)
Log(sb.String())
}
func (w *TickWatch) Run() {
w.EndChan = make(chan bool)
// Use a goroutine to circumvent ticks which never end
// TODO: Reuse goroutines across multiple *TickWatch?
go func() {
defer w.Ticker.Stop()
defer close(w.EndChan)
defer EatPanics()
var n int
var downOnce bool
for {
select {
case <-w.Ticker.C:
// Less noisy logs
if n > 20 && n%5 == 0 {
Logf("%d seconds elapsed since tick %s started", 5*n, w.Name)
} else if n > 2 && n%2 != 0 {
Logf("%d seconds elapsed since tick %s started", 5*n, w.Name)
}
if !downOnce && w.DBCheck == 0 {
qgen.Builder.GetConn().SetConnMaxLifetime(time.Second / 4) // Drop all the connections and start over
LogWarning(ErrDBDown)
atomic.StoreInt32(&IsDBDown, 1)
downOnce = true
}
n++
case <-w.Deadline.C:
Log("Hit TickWatch deadline")
dur := time.Duration(uutils.Nanotime() - w.Start)
if dur.Seconds() > 5 {
Log("tick " + w.Name + " has run for " + dur.String())
w.DumpElapsed()
}
return
case <-w.EndChan:
dur := time.Duration(uutils.Nanotime() - w.Start)
if dur.Seconds() > 5 {
Log("tick " + w.Name + " completed in " + dur.String())
w.DumpElapsed()
}
if w.OutEndChan != nil {
w.OutEndChan <- true
close(w.OutEndChan)
}
return
}
}
}()
}
func (w *TickWatch) Stop() {
w.EndChan <- true
}
func (w *TickWatch) Set(a *int64, v int64) {
atomic.StoreInt64(a, v)
}
func (w *TickWatch) Clear() {
w.Start = 0
w.DBCheck = 0
w.StartHook = 0
w.Tasks = 0
w.EndHook = 0
}