2021-05-02 08:47:19 +00:00
package common
import (
2021-05-14 07:56:54 +00:00
"fmt"
2021-05-02 08:47:19 +00:00
"log"
"strconv"
2021-05-14 07:56:54 +00:00
"strings"
2021-05-02 08:47:19 +00:00
"sync/atomic"
"time"
qgen "github.com/Azareal/Gosora/query_gen"
2021-05-14 07:56:54 +00:00
"github.com/Azareal/Gosora/uutils"
2021-05-02 08:47:19 +00:00
"github.com/pkg/errors"
)
2021-05-05 07:24:16 +00:00
var CTickLoop * TickLoop
2021-05-02 08:47:19 +00:00
type TickLoop struct {
HalfSec * time . Ticker
Sec * time . Ticker
FifteenMin * time . Ticker
Hour * time . Ticker
Day * time . Ticker
HalfSecf func ( ) error
Secf func ( ) error
FifteenMinf func ( ) error
Hourf func ( ) error
Dayf func ( ) error
}
func NewTickLoop ( ) * TickLoop {
return & TickLoop {
// TODO: Write tests for these
// Run this goroutine once every half second
HalfSec : time . NewTicker ( time . Second / 2 ) ,
Sec : time . NewTicker ( time . Second ) ,
FifteenMin : time . NewTicker ( 15 * time . Minute ) ,
Hour : time . NewTicker ( time . Hour ) ,
Day : time . NewTicker ( time . Hour * 24 ) ,
}
}
func ( l * TickLoop ) Loop ( ) {
r := func ( e error ) {
if e != nil {
LogError ( e )
}
}
2021-05-05 07:24:16 +00:00
defer EatPanics ( )
2021-05-02 08:47:19 +00:00
for {
select {
case <- l . HalfSec . C :
r ( l . HalfSecf ( ) )
case <- l . Sec . C :
r ( l . Secf ( ) )
case <- l . FifteenMin . C :
r ( l . FifteenMinf ( ) )
case <- l . Hour . C :
r ( l . Hourf ( ) )
// TODO: Handle the instance going down a lot better
case <- l . Day . C :
r ( l . Dayf ( ) )
}
}
}
var ErrDBDown = errors . New ( "The database is down" )
2021-06-14 03:32:07 +00:00
func DBTimeout ( ) time . Duration {
if Dev . HourDBTimeout {
return time . Hour
}
//db.SetConnMaxLifetime(time.Second * 60 * 5) // Make this infinite as the temporary lifetime change will purge the stale connections?
return - 1
}
2021-07-10 23:10:19 +00:00
var pint int64
2021-05-02 08:47:19 +00:00
func StartTick ( ) ( abort bool ) {
2021-07-10 23:10:19 +00:00
opint := pint
2021-05-02 08:47:19 +00:00
db := qgen . Builder . GetConn ( )
isDBDown := atomic . LoadInt32 ( & IsDBDown )
if e := db . Ping ( ) ; e != nil {
// TODO: There's a bit of a race here, but it doesn't matter if this error appears multiple times in the logs as it's capped at three times, we just want to cut it down 99% of the time
if isDBDown == 0 {
2021-06-14 03:32:07 +00:00
db . SetConnMaxLifetime ( time . Second / 4 ) // Drop all the connections and start over
2021-05-02 08:47:19 +00:00
LogWarning ( e , ErrDBDown . Error ( ) )
}
atomic . StoreInt32 ( & IsDBDown , 1 )
return true
}
if isDBDown == 1 {
log . Print ( "The database is back" )
}
2021-06-14 03:32:07 +00:00
db . SetConnMaxLifetime ( DBTimeout ( ) )
2021-05-02 08:47:19 +00:00
atomic . StoreInt32 ( & IsDBDown , 0 )
2021-07-10 23:10:19 +00:00
return opint != pint
2021-05-02 08:47:19 +00:00
}
// TODO: Move these into DailyTick() methods?
func asmMatches ( ) error {
// TODO: Find a more efficient way of doing this
return qgen . NewAcc ( ) . Select ( "activity_stream" ) . Cols ( "asid" ) . EachInt ( func ( asid int ) error {
if ActivityMatches . CountAsid ( asid ) > 0 {
return nil
}
return Activity . Delete ( asid )
} )
}
// TODO: Name the tasks so we can figure out which one it was when something goes wrong? Or maybe toss it up WithStack down there?
func RunTasks ( tasks [ ] func ( ) error ) error {
for _ , task := range tasks {
if e := task ( ) ; e != nil {
return e
}
}
return nil
}
/ * func init ( ) {
DbInits . Add ( func ( acc * qgen . Accumulator ) error {
replyStmts = ReplyStmts {
isLiked : acc . Select ( "likes" ) . Columns ( "targetItem" ) . Where ( "sentBy=? and targetItem=? and targetType='replies'" ) . Prepare ( ) ,
}
return acc . FirstError ( )
} )
} * /
func StartupTasks ( ) ( e error ) {
r := func ( ee error ) {
if e == nil {
e = ee
}
}
if Config . DisableRegLog {
r ( RegLogs . Purge ( ) )
}
if Config . DisableLoginLog {
r ( LoginLogs . Purge ( ) )
}
if Config . DisablePostIP {
// TODO: Clear the caches?
r ( Topics . ClearIPs ( ) )
r ( Rstore . ClearIPs ( ) )
r ( Prstore . ClearIPs ( ) )
}
if Config . DisablePollIP {
r ( Polls . ClearIPs ( ) )
}
if Config . DisableLastIP {
r ( Users . ClearLastIPs ( ) )
}
return e
}
func Dailies ( ) ( e error ) {
if e = asmMatches ( ) ; e != nil {
return e
}
newAcc := func ( ) * qgen . Accumulator {
return qgen . NewAcc ( )
}
exec := func ( ac qgen . AccExec ) {
if e != nil {
return
}
_ , ee := ac . Exec ( )
e = ee
}
r := func ( ee error ) {
if e == nil {
e = ee
}
}
if Config . LogPruneCutoff > - 1 {
// TODO: Clear the caches?
if ! Config . DisableLoginLog {
r ( LoginLogs . DeleteOlderThanDays ( Config . LogPruneCutoff ) )
}
if ! Config . DisableRegLog {
r ( RegLogs . DeleteOlderThanDays ( Config . LogPruneCutoff ) )
}
}
if ! Config . DisablePostIP && Config . PostIPCutoff > - 1 {
// TODO: Use unixtime to remove this MySQLesque logic?
f := func ( tbl string ) {
exec ( newAcc ( ) . Update ( tbl ) . Set ( "ip=''" ) . DateOlderThan ( "createdAt" , Config . PostIPCutoff , "day" ) . Where ( "ip!=''" ) )
}
f ( "topics" )
f ( "replies" )
f ( "users_replies" )
}
if ! Config . DisablePollIP && Config . PollIPCutoff > - 1 {
// TODO: Use unixtime to remove this MySQLesque logic?
exec ( newAcc ( ) . Update ( "polls_votes" ) . Set ( "ip=''" ) . DateOlderThan ( "castAt" , Config . PollIPCutoff , "day" ) . Where ( "ip!=''" ) )
// TODO: Find some way of purging the ip data in polls_votes without breaking any anti-cheat measures which might be running... maybe hash it instead?
}
// TODO: lastActiveAt isn't currently set, so we can't rely on this to purge last_ips of users who haven't been on in a while
if ! Config . DisableLastIP && Config . LastIPCutoff > 0 {
//exec(newAcc().Update("users").Set("last_ip='0'").DateOlderThan("lastActiveAt",c.Config.PostIPCutoff,"day").Where("last_ip!='0'"))
mon := time . Now ( ) . Month ( )
exec ( newAcc ( ) . Update ( "users" ) . Set ( "last_ip=''" ) . Where ( "last_ip!='' AND last_ip NOT LIKE '" + strconv . Itoa ( int ( mon ) ) + "-%'" ) )
}
if e != nil {
return e
}
2021-05-03 00:36:29 +00:00
if e = Tasks . Day . Run ( ) ; e != nil {
2021-05-02 08:47:19 +00:00
return e
}
2021-05-05 07:24:16 +00:00
if e = ForumActionStore . DailyTick ( ) ; e != nil {
2021-05-02 08:47:19 +00:00
return e
}
{
e := Meta . SetInt64 ( "lastDaily" , time . Now ( ) . Unix ( ) )
if e != nil {
return e
}
}
return nil
}
2021-05-14 07:56:54 +00:00
type TickWatch struct {
Name string
Start int64
DBCheck int64
StartHook int64
Tasks int64
EndHook int64
2021-07-10 23:10:19 +00:00
Ticker * time . Ticker
Deadline * time . Ticker
EndChan chan bool
OutEndChan chan bool
2021-05-14 07:56:54 +00:00
}
func NewTickWatch ( ) * TickWatch {
return & TickWatch {
Ticker : time . NewTicker ( time . Second * 5 ) ,
Deadline : time . NewTicker ( time . Hour ) ,
}
}
func ( w * TickWatch ) DumpElapsed ( ) {
var sb strings . Builder
f := func ( str string ) {
sb . WriteString ( str )
}
ff := func ( str string , args ... interface { } ) {
f ( fmt . Sprintf ( str , args ... ) )
}
2021-05-26 03:10:34 +00:00
elapse := func ( name string , bef , v int64 ) {
2021-05-14 07:56:54 +00:00
if bef == 0 || v == 0 {
ff ( "%s: %d\n" , v )
2021-06-14 03:32:07 +00:00
return
2021-05-14 07:56:54 +00:00
}
2021-05-26 03:10:34 +00:00
dur := time . Duration ( v - bef )
milli := dur . Milliseconds ( )
if milli < 1000 {
ff ( "%s: %d - %d ms\n" , name , v , milli )
} else if milli > 60000 {
secs := milli / 1000
mins := secs / 60
ff ( "%s: %d - m%d s%.2f\n" , name , v , mins , float64 ( milli - ( mins * 60 * 1000 ) ) / 1000 )
} else {
ff ( "%s: %d - %.2f secs\n" , name , v , dur . Seconds ( ) )
}
2021-05-14 07:56:54 +00:00
}
f ( "Name: " + w . Name + "\n" )
ff ( "Start: %d\n" , w . Start )
2021-05-26 03:10:34 +00:00
elapse ( "DBCheck" , w . Start , w . DBCheck )
2021-06-14 03:32:07 +00:00
if w . DBCheck == 0 {
Log ( sb . String ( ) )
return
}
2021-05-26 03:10:34 +00:00
elapse ( "StartHook" , w . DBCheck , w . StartHook )
2021-06-14 03:32:07 +00:00
if w . StartHook == 0 {
Log ( sb . String ( ) )
return
}
2021-05-26 03:10:34 +00:00
elapse ( "Tasks" , w . StartHook , w . Tasks )
2021-06-14 03:32:07 +00:00
if w . Tasks == 0 {
Log ( sb . String ( ) )
return
}
2021-05-26 03:10:34 +00:00
elapse ( "EndHook" , w . Tasks , w . EndHook )
2021-05-14 07:56:54 +00:00
Log ( sb . String ( ) )
}
func ( w * TickWatch ) Run ( ) {
w . EndChan = make ( chan bool )
// Use a goroutine to circumvent ticks which never end
2021-07-10 23:10:19 +00:00
// TODO: Reuse goroutines across multiple *TickWatch?
2021-05-14 07:56:54 +00:00
go func ( ) {
defer w . Ticker . Stop ( )
defer close ( w . EndChan )
defer EatPanics ( )
var n int
2021-05-26 03:10:34 +00:00
var downOnce bool
2021-05-14 07:56:54 +00:00
for {
select {
case <- w . Ticker . C :
2021-05-26 03:10:34 +00:00
// Less noisy logs
2021-06-14 03:32:07 +00:00
if n > 20 && n % 5 == 0 {
Logf ( "%d seconds elapsed since tick %s started" , 5 * n , w . Name )
} else if n > 2 && n % 2 != 0 {
2021-05-26 03:10:34 +00:00
Logf ( "%d seconds elapsed since tick %s started" , 5 * n , w . Name )
}
if ! downOnce && w . DBCheck == 0 {
2021-06-14 03:32:07 +00:00
qgen . Builder . GetConn ( ) . SetConnMaxLifetime ( time . Second / 4 ) // Drop all the connections and start over
2021-05-26 03:10:34 +00:00
LogWarning ( ErrDBDown )
atomic . StoreInt32 ( & IsDBDown , 1 )
downOnce = true
}
2021-05-14 07:56:54 +00:00
n ++
case <- w . Deadline . C :
Log ( "Hit TickWatch deadline" )
dur := time . Duration ( uutils . Nanotime ( ) - w . Start )
if dur . Seconds ( ) > 5 {
Log ( "tick " + w . Name + " has run for " + dur . String ( ) )
w . DumpElapsed ( )
}
return
case <- w . EndChan :
dur := time . Duration ( uutils . Nanotime ( ) - w . Start )
if dur . Seconds ( ) > 5 {
Log ( "tick " + w . Name + " completed in " + dur . String ( ) )
w . DumpElapsed ( )
}
2021-07-10 23:10:19 +00:00
if w . OutEndChan != nil {
w . OutEndChan <- true
close ( w . OutEndChan )
}
2021-05-14 07:56:54 +00:00
return
}
}
} ( )
}
func ( w * TickWatch ) Stop ( ) {
w . EndChan <- true
}
func ( w * TickWatch ) Set ( a * int64 , v int64 ) {
atomic . StoreInt64 ( a , v )
}
func ( w * TickWatch ) Clear ( ) {
w . Start = 0
w . DBCheck = 0
w . StartHook = 0
w . Tasks = 0
w . EndHook = 0
}