parent
3da67241e6
commit
dbc6fdb27c
|
@ -325,6 +325,7 @@ func AddActivityAndNotifyTarget(a Alert) error {
|
||||||
// Live alerts, if the target is online and WebSockets is enabled
|
// Live alerts, if the target is online and WebSockets is enabled
|
||||||
if EnableWebsockets {
|
if EnableWebsockets {
|
||||||
go func() {
|
go func() {
|
||||||
|
defer EatPanics()
|
||||||
_ = WsHub.pushAlert(a.TargetUserID, a)
|
_ = WsHub.pushAlert(a.TargetUserID, a)
|
||||||
//fmt.Println("err:",err)
|
//fmt.Println("err:",err)
|
||||||
}()
|
}()
|
||||||
|
@ -341,15 +342,18 @@ func NotifyWatchers(asid int) error {
|
||||||
|
|
||||||
// Alert the subscribers about this without blocking us from doing something else
|
// Alert the subscribers about this without blocking us from doing something else
|
||||||
if EnableWebsockets {
|
if EnableWebsockets {
|
||||||
go notifyWatchers(asid)
|
go func() {
|
||||||
|
defer EatPanics()
|
||||||
|
notifyWatchers(asid)
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func notifyWatchers(asid int) {
|
func notifyWatchers(asid int) {
|
||||||
rows, err := alertStmts.getWatchers.Query(asid)
|
rows, e := alertStmts.getWatchers.Query(asid)
|
||||||
if err != nil && err != ErrNoRows {
|
if e != nil && e != ErrNoRows {
|
||||||
LogError(err)
|
LogError(e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
defer rows.Close()
|
defer rows.Close()
|
||||||
|
@ -357,21 +361,20 @@ func notifyWatchers(asid int) {
|
||||||
var uid int
|
var uid int
|
||||||
var uids []int
|
var uids []int
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
err := rows.Scan(&uid)
|
if e := rows.Scan(&uid); e != nil {
|
||||||
if err != nil {
|
LogError(e)
|
||||||
LogError(err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
uids = append(uids, uid)
|
uids = append(uids, uid)
|
||||||
}
|
}
|
||||||
if err = rows.Err(); err != nil {
|
if e = rows.Err(); e != nil {
|
||||||
LogError(err)
|
LogError(e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
alert, err := Activity.Get(asid)
|
alert, e := Activity.Get(asid)
|
||||||
if err != nil && err != ErrNoRows {
|
if e != nil && e != ErrNoRows {
|
||||||
LogError(err)
|
LogError(e)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = WsHub.pushAlerts(uids, alert)
|
_ = WsHub.pushAlerts(uids, alert)
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"runtime/debug"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
|
@ -200,13 +201,12 @@ func DebugLogf(str string, args ...interface{}) {
|
||||||
func Log(args ...interface{}) {
|
func Log(args ...interface{}) {
|
||||||
log.Print(args...)
|
log.Print(args...)
|
||||||
}
|
}
|
||||||
func Err(args ...interface{}) {
|
|
||||||
ErrLogger.Print(args...)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Logf(str string, args ...interface{}) {
|
func Logf(str string, args ...interface{}) {
|
||||||
log.Printf(str, args...)
|
log.Printf(str, args...)
|
||||||
}
|
}
|
||||||
|
func Err(args ...interface{}) {
|
||||||
|
ErrLogger.Print(args...)
|
||||||
|
}
|
||||||
|
|
||||||
func Count(stmt *sql.Stmt) (count int) {
|
func Count(stmt *sql.Stmt) (count int) {
|
||||||
e := stmt.QueryRow().Scan(&count)
|
e := stmt.QueryRow().Scan(&count)
|
||||||
|
@ -327,3 +327,11 @@ func (cw *ConnWatcher) StateChange(conn net.Conn, state http.ConnState) {
|
||||||
func (cw *ConnWatcher) Count() int {
|
func (cw *ConnWatcher) Count() int {
|
||||||
return int(atomic.LoadInt64(&cw.n))
|
return int(atomic.LoadInt64(&cw.n))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func EatPanics() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
log.Print(r)
|
||||||
|
debug.PrintStack()
|
||||||
|
log.Fatal("Fatal error.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ func NewMemoryCounter(acc *qgen.Accumulator) (*DefaultMemoryCounter, error) {
|
||||||
c.Tasks.Shutdown.Add(co.Tick)
|
c.Tasks.Shutdown.Add(co.Tick)
|
||||||
ticker := time.NewTicker(time.Minute)
|
ticker := time.NewTicker(time.Minute)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer c.EatPanics()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
|
|
|
@ -75,7 +75,7 @@ func (co *DefaultPerfCounter) insertChunk(low, high, avg int64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
c.DebugLogf("Inserting a pchunk with low %d, high %d, avg %d", low, high, avg)
|
c.DebugLogf("Inserting a pchunk with low %d, high %d, avg %d", low, high, avg)
|
||||||
if c.Dev.LogNewLongRoute && (high*1000*1000) > 5 {
|
if c.Dev.LogNewLongRoute && high > (5*1000*1000) {
|
||||||
c.Logf("pchunk high %d", high)
|
c.Logf("pchunk high %d", high)
|
||||||
}
|
}
|
||||||
_, e := co.insert.Exec(low, high, avg)
|
_, e := co.insert.Exec(low, high, avg)
|
||||||
|
|
|
@ -15,13 +15,14 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
func ThumbTask(thumbChan chan bool) {
|
func ThumbTask(thumbChan chan bool) {
|
||||||
|
defer EatPanics()
|
||||||
|
acc := qgen.NewAcc()
|
||||||
for {
|
for {
|
||||||
// Put this goroutine to sleep until we have work to do
|
// Put this goroutine to sleep until we have work to do
|
||||||
<-thumbChan
|
<-thumbChan
|
||||||
|
|
||||||
// TODO: Use a real queue
|
// TODO: Use a real queue
|
||||||
// TODO: Transactions? Self-repairing?
|
// TODO: Transactions? Self-repairing?
|
||||||
acc := qgen.NewAcc()
|
|
||||||
err := acc.Select("users_avatar_queue").Columns("uid").Limit("0,5").EachInt(func(uid int) error {
|
err := acc.Select("users_avatar_queue").Columns("uid").Limit("0,5").EachInt(func(uid int) error {
|
||||||
// TODO: Do a bulk user fetch instead?
|
// TODO: Do a bulk user fetch instead?
|
||||||
u, err := Users.Get(uid)
|
u, err := Users.Get(uid)
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var CTickLoop *TickLoop
|
||||||
|
|
||||||
type TickLoop struct {
|
type TickLoop struct {
|
||||||
HalfSec *time.Ticker
|
HalfSec *time.Ticker
|
||||||
Sec *time.Ticker
|
Sec *time.Ticker
|
||||||
|
@ -42,6 +44,7 @@ func (l *TickLoop) Loop() {
|
||||||
LogError(e)
|
LogError(e)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
defer EatPanics()
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-l.HalfSec.C:
|
case <-l.HalfSec.C:
|
||||||
|
@ -199,8 +202,7 @@ func Dailies() (e error) {
|
||||||
if e = Tasks.Day.Run(); e != nil {
|
if e = Tasks.Day.Run(); e != nil {
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
e = ForumActionStore.DailyTick()
|
if e = ForumActionStore.DailyTick(); e != nil {
|
||||||
if e != nil {
|
|
||||||
return e
|
return e
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,11 +77,11 @@ func NewDefaultTopicList(acc *qgen.Accumulator) (*DefaultTopicList, error) {
|
||||||
getTopicsByForum: acc.Select("topics").Columns("tid,title,content,createdBy,is_closed,sticky,createdAt,lastReplyAt,lastReplyBy,lastReplyID,views,postCount,likeCount").Where("parentID=?").Orderby("sticky DESC,lastReplyAt DESC,createdBy DESC").Limit("?,?").Prepare(),
|
getTopicsByForum: acc.Select("topics").Columns("tid,title,content,createdBy,is_closed,sticky,createdAt,lastReplyAt,lastReplyBy,lastReplyID,views,postCount,likeCount").Where("parentID=?").Orderby("sticky DESC,lastReplyAt DESC,createdBy DESC").Limit("?,?").Prepare(),
|
||||||
//getTidsByForum: acc.Select("topics").Columns("tid").Where("parentID=?").Orderby("sticky DESC,lastReplyAt DESC,createdBy DESC").Limit("?,?").Prepare(),
|
//getTidsByForum: acc.Select("topics").Columns("tid").Where("parentID=?").Orderby("sticky DESC,lastReplyAt DESC,createdBy DESC").Limit("?,?").Prepare(),
|
||||||
}
|
}
|
||||||
if err := acc.FirstError(); err != nil {
|
if e := acc.FirstError(); e != nil {
|
||||||
return nil, err
|
return nil, e
|
||||||
}
|
}
|
||||||
if err := tList.Tick(); err != nil {
|
if e := tList.Tick(); e != nil {
|
||||||
return nil, err
|
return nil, e
|
||||||
}
|
}
|
||||||
|
|
||||||
Tasks.HalfSec.Add(tList.Tick)
|
Tasks.HalfSec.Add(tList.Tick)
|
||||||
|
|
|
@ -196,7 +196,10 @@ func wsPageResponses(wsUser *WSUser, conn *websocket.Conn, page string) {
|
||||||
watchers := len(adminStatsWatchers)
|
watchers := len(adminStatsWatchers)
|
||||||
adminStatsWatchers[conn] = wsUser
|
adminStatsWatchers[conn] = wsUser
|
||||||
if watchers == 0 {
|
if watchers == 0 {
|
||||||
go adminStatsTicker()
|
go func() {
|
||||||
|
defer EatPanics()
|
||||||
|
adminStatsTicker()
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
adminStatsMutex.Unlock()
|
adminStatsMutex.Unlock()
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -46,6 +46,7 @@ func (h *WsHubImpl) Start() {
|
||||||
}()
|
}()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
|
defer EatPanics()
|
||||||
for {
|
for {
|
||||||
item := func(l *sync.RWMutex, userMap map[int]*WSUser) {
|
item := func(l *sync.RWMutex, userMap map[int]*WSUser) {
|
||||||
l.RLock()
|
l.RLock()
|
||||||
|
|
Loading…
Reference in New Issue