659 lines
14 KiB
Go
659 lines
14 KiB
Go
/*
|
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
*
|
|
* This program is free software: you can use, redistribute, and/or modify
|
|
* it under the terms of the GNU Affero General Public License, version 3
|
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
*
|
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
*
|
|
* You should have received a copy of the GNU Affero General Public License
|
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
*/
|
|
|
|
package app
|
|
|
|
import (
|
|
"bytes"
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"regexp"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"text/scanner"
|
|
"text/template"
|
|
"time"
|
|
|
|
"github.com/taosdata/alert/app/expr"
|
|
"github.com/taosdata/alert/models"
|
|
"github.com/taosdata/alert/utils"
|
|
"github.com/taosdata/alert/utils/log"
|
|
)
|
|
|
|
type Duration struct{ time.Duration }
|
|
|
|
func (d Duration) MarshalJSON() ([]byte, error) {
|
|
return json.Marshal(d.String())
|
|
}
|
|
|
|
func (d *Duration) doUnmarshal(v interface{}) error {
|
|
switch value := v.(type) {
|
|
case float64:
|
|
*d = Duration{time.Duration(value)}
|
|
return nil
|
|
case string:
|
|
if duration, e := time.ParseDuration(value); e != nil {
|
|
return e
|
|
} else {
|
|
*d = Duration{duration}
|
|
}
|
|
return nil
|
|
default:
|
|
return errors.New("invalid duration")
|
|
}
|
|
}
|
|
|
|
func (d *Duration) UnmarshalJSON(b []byte) error {
|
|
var v interface{}
|
|
if e := json.Unmarshal(b, &v); e != nil {
|
|
return e
|
|
}
|
|
return d.doUnmarshal(v)
|
|
}
|
|
|
|
const (
|
|
AlertStateWaiting = iota
|
|
AlertStatePending
|
|
AlertStateFiring
|
|
)
|
|
|
|
type Alert struct {
|
|
State uint8 `json:"-"`
|
|
LastRefreshAt time.Time `json:"-"`
|
|
StartsAt time.Time `json:"startsAt,omitempty"`
|
|
EndsAt time.Time `json:"endsAt,omitempty"`
|
|
Values map[string]interface{} `json:"values"`
|
|
Labels map[string]string `json:"labels"`
|
|
Annotations map[string]string `json:"annotations"`
|
|
}
|
|
|
|
func (alert *Alert) doRefresh(firing bool, rule *Rule) bool {
|
|
switch {
|
|
case (!firing) && (alert.State == AlertStateWaiting):
|
|
return false
|
|
|
|
case (!firing) && (alert.State == AlertStatePending):
|
|
alert.State = AlertStateWaiting
|
|
return false
|
|
|
|
case (!firing) && (alert.State == AlertStateFiring):
|
|
alert.State = AlertStateWaiting
|
|
alert.EndsAt = time.Now()
|
|
|
|
case firing && (alert.State == AlertStateWaiting):
|
|
alert.StartsAt = time.Now()
|
|
alert.EndsAt = time.Time{}
|
|
if rule.For.Nanoseconds() > 0 {
|
|
alert.State = AlertStatePending
|
|
return false
|
|
}
|
|
alert.State = AlertStateFiring
|
|
|
|
case firing && (alert.State == AlertStatePending):
|
|
if time.Now().Sub(alert.StartsAt) < rule.For.Duration {
|
|
return false
|
|
}
|
|
alert.StartsAt = alert.StartsAt.Add(rule.For.Duration)
|
|
alert.EndsAt = time.Time{}
|
|
alert.State = AlertStateFiring
|
|
|
|
case firing && (alert.State == AlertStateFiring):
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func (alert *Alert) refresh(rule *Rule, values map[string]interface{}) {
|
|
alert.LastRefreshAt = time.Now()
|
|
|
|
defer func() {
|
|
switch x := recover().(type) {
|
|
case nil:
|
|
case error:
|
|
rule.setState(RuleStateError)
|
|
log.Errorf("[%s]: failed to evaluate: %s", rule.Name, x.Error())
|
|
default:
|
|
rule.setState(RuleStateError)
|
|
log.Errorf("[%s]: failed to evaluate: unknown error", rule.Name)
|
|
}
|
|
}()
|
|
|
|
alert.Values = values
|
|
res := rule.Expr.Eval(func(key string) interface{} {
|
|
// ToLower is required as column name in result is in lower case
|
|
i := alert.Values[strings.ToLower(key)]
|
|
switch v := i.(type) {
|
|
case int8:
|
|
return int64(v)
|
|
case int16:
|
|
return int64(v)
|
|
case int:
|
|
return int64(v)
|
|
case int32:
|
|
return int64(v)
|
|
case float32:
|
|
return float64(v)
|
|
default:
|
|
return v
|
|
}
|
|
})
|
|
|
|
val, ok := res.(bool)
|
|
if !ok {
|
|
rule.setState(RuleStateError)
|
|
log.Errorf("[%s]: result type is not bool", rule.Name)
|
|
return
|
|
}
|
|
|
|
if !alert.doRefresh(val, rule) {
|
|
return
|
|
}
|
|
|
|
buf := bytes.Buffer{}
|
|
alert.Annotations = map[string]string{}
|
|
for k, v := range rule.Annotations {
|
|
if e := v.Execute(&buf, alert); e != nil {
|
|
log.Errorf("[%s]: failed to generate annotation '%s': %s", rule.Name, k, e.Error())
|
|
} else {
|
|
alert.Annotations[k] = buf.String()
|
|
}
|
|
buf.Reset()
|
|
}
|
|
|
|
buf.Reset()
|
|
if e := json.NewEncoder(&buf).Encode(alert); e != nil {
|
|
log.Errorf("[%s]: failed to serialize alert to JSON: %s", rule.Name, e.Error())
|
|
} else {
|
|
chAlert <- buf.String()
|
|
}
|
|
}
|
|
|
|
const (
|
|
RuleStateNormal = iota
|
|
RuleStateError
|
|
RuleStateDisabled
|
|
RuleStateRunning = 0x04
|
|
)
|
|
|
|
type Rule struct {
|
|
Name string `json:"name"`
|
|
State uint32 `json:"state"`
|
|
SQL string `json:"sql"`
|
|
GroupByCols []string `json:"-"`
|
|
For Duration `json:"for"`
|
|
Period Duration `json:"period"`
|
|
NextRunTime time.Time `json:"-"`
|
|
RawExpr string `json:"expr"`
|
|
Expr expr.Expr `json:"-"`
|
|
Labels map[string]string `json:"labels"`
|
|
RawAnnotations map[string]string `json:"annotations"`
|
|
Annotations map[string]*template.Template `json:"-"`
|
|
Alerts sync.Map `json:"-"`
|
|
}
|
|
|
|
func (rule *Rule) clone() *Rule {
|
|
return &Rule{
|
|
Name: rule.Name,
|
|
State: RuleStateNormal,
|
|
SQL: rule.SQL,
|
|
GroupByCols: rule.GroupByCols,
|
|
For: rule.For,
|
|
Period: rule.Period,
|
|
NextRunTime: time.Time{},
|
|
RawExpr: rule.RawExpr,
|
|
Expr: rule.Expr,
|
|
Labels: rule.Labels,
|
|
RawAnnotations: rule.RawAnnotations,
|
|
Annotations: rule.Annotations,
|
|
// don't copy alerts
|
|
}
|
|
}
|
|
|
|
func (rule *Rule) setState(s uint32) {
|
|
for {
|
|
old := atomic.LoadUint32(&rule.State)
|
|
new := old&0xffffffc0 | s
|
|
if atomic.CompareAndSwapUint32(&rule.State, old, new) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rule *Rule) state() uint32 {
|
|
return atomic.LoadUint32(&rule.State) & 0xffffffc0
|
|
}
|
|
|
|
func (rule *Rule) isEnabled() bool {
|
|
state := atomic.LoadUint32(&rule.State)
|
|
return state&RuleStateDisabled == 0
|
|
}
|
|
|
|
func (rule *Rule) setNextRunTime(tm time.Time) {
|
|
rule.NextRunTime = tm.Round(rule.Period.Duration)
|
|
if rule.NextRunTime.Before(tm) {
|
|
rule.NextRunTime = rule.NextRunTime.Add(rule.Period.Duration)
|
|
}
|
|
}
|
|
|
|
func parseGroupBy(sql string) (cols []string, err error) {
|
|
defer func() {
|
|
if e := recover(); e != nil {
|
|
err = e.(error)
|
|
}
|
|
}()
|
|
|
|
s := scanner.Scanner{
|
|
Error: func(s *scanner.Scanner, msg string) {
|
|
panic(errors.New(msg))
|
|
},
|
|
Mode: scanner.ScanIdents | scanner.ScanInts | scanner.ScanFloats,
|
|
}
|
|
|
|
s.Init(strings.NewReader(sql))
|
|
if s.Scan() != scanner.Ident || strings.ToLower(s.TokenText()) != "select" {
|
|
err = errors.New("only select statement is allowed.")
|
|
return
|
|
}
|
|
|
|
hasGroupBy := false
|
|
for t := s.Scan(); t != scanner.EOF; t = s.Scan() {
|
|
if t != scanner.Ident {
|
|
continue
|
|
}
|
|
if strings.ToLower(s.TokenText()) != "group" {
|
|
continue
|
|
}
|
|
if s.Scan() != scanner.Ident {
|
|
continue
|
|
}
|
|
if strings.ToLower(s.TokenText()) == "by" {
|
|
hasGroupBy = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !hasGroupBy {
|
|
return
|
|
}
|
|
|
|
for {
|
|
if s.Scan() != scanner.Ident {
|
|
err = errors.New("SQL statement syntax error.")
|
|
return
|
|
}
|
|
col := strings.ToLower(s.TokenText())
|
|
cols = append(cols, col)
|
|
if s.Scan() != ',' {
|
|
break
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (rule *Rule) parseGroupBy() (err error) {
|
|
cols, e := parseGroupBy(rule.SQL)
|
|
if e == nil {
|
|
rule.GroupByCols = cols
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (rule *Rule) getAlert(values map[string]interface{}) *Alert {
|
|
sb := strings.Builder{}
|
|
for _, name := range rule.GroupByCols {
|
|
value := values[name]
|
|
if value == nil {
|
|
} else {
|
|
sb.WriteString(fmt.Sprint(value))
|
|
}
|
|
sb.WriteByte('_')
|
|
}
|
|
|
|
var alert *Alert
|
|
key := sb.String()
|
|
|
|
if v, ok := rule.Alerts.Load(key); ok {
|
|
alert = v.(*Alert)
|
|
}
|
|
if alert == nil {
|
|
alert = &Alert{Labels: map[string]string{}}
|
|
for k, v := range rule.Labels {
|
|
alert.Labels[k] = v
|
|
}
|
|
for _, name := range rule.GroupByCols {
|
|
value := values[name]
|
|
if value == nil {
|
|
alert.Labels[name] = ""
|
|
} else {
|
|
alert.Labels[name] = fmt.Sprint(value)
|
|
}
|
|
}
|
|
rule.Alerts.Store(key, alert)
|
|
}
|
|
|
|
return alert
|
|
}
|
|
|
|
func (rule *Rule) preRun(tm time.Time) bool {
|
|
if tm.Before(rule.NextRunTime) {
|
|
return false
|
|
}
|
|
rule.setNextRunTime(tm)
|
|
|
|
for {
|
|
state := atomic.LoadUint32(&rule.State)
|
|
if state != RuleStateNormal {
|
|
return false
|
|
}
|
|
if atomic.CompareAndSwapUint32(&rule.State, state, RuleStateRunning) {
|
|
break
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (rule *Rule) run(db *sql.DB) {
|
|
rows, e := db.Query(rule.SQL)
|
|
if e != nil {
|
|
log.Errorf("[%s]: failed to query TDengine: %s", rule.Name, e.Error())
|
|
return
|
|
}
|
|
|
|
cols, e := rows.ColumnTypes()
|
|
if e != nil {
|
|
log.Errorf("[%s]: unable to get column information: %s", rule.Name, e.Error())
|
|
return
|
|
}
|
|
|
|
for rows.Next() {
|
|
values := make([]interface{}, 0, len(cols))
|
|
for range cols {
|
|
var v interface{}
|
|
values = append(values, &v)
|
|
}
|
|
rows.Scan(values...)
|
|
|
|
m := make(map[string]interface{})
|
|
for i, col := range cols {
|
|
name := strings.ToLower(col.Name())
|
|
m[name] = *(values[i].(*interface{}))
|
|
}
|
|
|
|
alert := rule.getAlert(m)
|
|
alert.refresh(rule, m)
|
|
}
|
|
|
|
now := time.Now()
|
|
rule.Alerts.Range(func(k, v interface{}) bool {
|
|
alert := v.(*Alert)
|
|
if now.Sub(alert.LastRefreshAt) > rule.Period.Duration*10 {
|
|
rule.Alerts.Delete(k)
|
|
}
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (rule *Rule) postRun() {
|
|
for {
|
|
old := atomic.LoadUint32(&rule.State)
|
|
new := old & ^uint32(RuleStateRunning)
|
|
if atomic.CompareAndSwapUint32(&rule.State, old, new) {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func newRule(str string) (*Rule, error) {
|
|
rule := Rule{}
|
|
|
|
e := json.NewDecoder(strings.NewReader(str)).Decode(&rule)
|
|
if e != nil {
|
|
return nil, e
|
|
}
|
|
|
|
if rule.Period.Nanoseconds() <= 0 {
|
|
rule.Period = Duration{time.Minute}
|
|
}
|
|
rule.setNextRunTime(time.Now())
|
|
|
|
if rule.For.Nanoseconds() < 0 {
|
|
rule.For = Duration{0}
|
|
}
|
|
|
|
if e = rule.parseGroupBy(); e != nil {
|
|
return nil, e
|
|
}
|
|
|
|
if expr, e := expr.Compile(rule.RawExpr); e != nil {
|
|
return nil, e
|
|
} else {
|
|
rule.Expr = expr
|
|
}
|
|
|
|
rule.Annotations = map[string]*template.Template{}
|
|
for k, v := range rule.RawAnnotations {
|
|
v = reValue.ReplaceAllStringFunc(v, func(s string) string {
|
|
// as column name in query result is always in lower case,
|
|
// we need to convert value reference in annotations to
|
|
// lower case
|
|
return strings.ToLower(s)
|
|
})
|
|
text := "{{$labels := .Labels}}{{$values := .Values}}" + v
|
|
tmpl, e := template.New(k).Parse(text)
|
|
if e != nil {
|
|
return nil, e
|
|
}
|
|
rule.Annotations[k] = tmpl
|
|
}
|
|
|
|
return &rule, nil
|
|
}
|
|
|
|
const (
|
|
batchSize = 1024
|
|
)
|
|
|
|
var (
|
|
rules sync.Map
|
|
wg sync.WaitGroup
|
|
chStop = make(chan struct{})
|
|
chAlert = make(chan string, batchSize)
|
|
reValue = regexp.MustCompile(`\$values\.[_a-zA-Z0-9]+`)
|
|
)
|
|
|
|
func runRules() {
|
|
defer wg.Done()
|
|
|
|
db, e := sql.Open("taosSql", utils.Cfg.TDengine)
|
|
if e != nil {
|
|
log.Fatal("failed to connect to TDengine: ", e.Error())
|
|
}
|
|
defer db.Close()
|
|
|
|
ticker := time.NewTicker(time.Second)
|
|
defer ticker.Stop()
|
|
|
|
LOOP:
|
|
for {
|
|
var tm time.Time
|
|
select {
|
|
case <-chStop:
|
|
close(chAlert)
|
|
break LOOP
|
|
case tm = <-ticker.C:
|
|
}
|
|
|
|
rules.Range(func(k, v interface{}) bool {
|
|
rule := v.(*Rule)
|
|
if !rule.preRun(tm) {
|
|
return true
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(rule *Rule) {
|
|
defer wg.Done()
|
|
defer rule.postRun()
|
|
rule.run(db)
|
|
}(rule)
|
|
|
|
return true
|
|
})
|
|
}
|
|
}
|
|
|
|
func doPushAlerts(alerts []string) {
|
|
defer wg.Done()
|
|
|
|
if len(utils.Cfg.Receivers.AlertManager) == 0 {
|
|
return
|
|
}
|
|
|
|
buf := bytes.Buffer{}
|
|
buf.WriteByte('[')
|
|
for i, alert := range alerts {
|
|
if i > 0 {
|
|
buf.WriteByte(',')
|
|
}
|
|
buf.WriteString(alert)
|
|
}
|
|
buf.WriteByte(']')
|
|
|
|
log.Debug(buf.String())
|
|
|
|
resp, e := http.DefaultClient.Post(utils.Cfg.Receivers.AlertManager, "application/json", &buf)
|
|
if e != nil {
|
|
log.Errorf("failed to push alerts to downstream: %s", e.Error())
|
|
return
|
|
}
|
|
resp.Body.Close()
|
|
}
|
|
|
|
func pushAlerts() {
|
|
defer wg.Done()
|
|
|
|
ticker := time.NewTicker(time.Millisecond * 100)
|
|
defer ticker.Stop()
|
|
|
|
alerts := make([]string, 0, batchSize)
|
|
|
|
LOOP:
|
|
for {
|
|
select {
|
|
case alert := <-chAlert:
|
|
if utils.Cfg.Receivers.Console {
|
|
fmt.Print(alert)
|
|
}
|
|
if len(alert) == 0 {
|
|
if len(alerts) > 0 {
|
|
wg.Add(1)
|
|
doPushAlerts(alerts)
|
|
}
|
|
break LOOP
|
|
}
|
|
if len(alerts) == batchSize {
|
|
wg.Add(1)
|
|
go doPushAlerts(alerts)
|
|
alerts = make([]string, 0, batchSize)
|
|
}
|
|
alerts = append(alerts, alert)
|
|
|
|
case <-ticker.C:
|
|
if len(alerts) > 0 {
|
|
wg.Add(1)
|
|
go doPushAlerts(alerts)
|
|
alerts = make([]string, 0, batchSize)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func loadRuleFromDatabase() error {
|
|
allRules, e := models.LoadAllRule()
|
|
if e != nil {
|
|
log.Error("failed to load rules from database:", e.Error())
|
|
return e
|
|
}
|
|
|
|
count := 0
|
|
for _, r := range allRules {
|
|
rule, e := newRule(r.Content)
|
|
if e != nil {
|
|
log.Errorf("[%s]: parse failed: %s", r.Name, e.Error())
|
|
continue
|
|
}
|
|
if !r.Enabled {
|
|
rule.setState(RuleStateDisabled)
|
|
}
|
|
rules.Store(rule.Name, rule)
|
|
count++
|
|
}
|
|
log.Infof("total %d rules loaded", count)
|
|
return nil
|
|
}
|
|
|
|
func loadRuleFromFile() error {
|
|
f, e := os.Open(utils.Cfg.RuleFile)
|
|
if e != nil {
|
|
log.Error("failed to load rules from file:", e.Error())
|
|
return e
|
|
}
|
|
defer f.Close()
|
|
|
|
var allRules []Rule
|
|
e = json.NewDecoder(f).Decode(&allRules)
|
|
if e != nil {
|
|
log.Error("failed to parse rule file:", e.Error())
|
|
return e
|
|
}
|
|
|
|
for i := 0; i < len(allRules); i++ {
|
|
rule := &allRules[i]
|
|
rules.Store(rule.Name, rule)
|
|
}
|
|
log.Infof("total %d rules loaded", len(allRules))
|
|
|
|
return nil
|
|
}
|
|
|
|
func initRule() error {
|
|
if len(utils.Cfg.Database) > 0 {
|
|
if e := loadRuleFromDatabase(); e != nil {
|
|
return e
|
|
}
|
|
} else {
|
|
if e := loadRuleFromFile(); e != nil {
|
|
return e
|
|
}
|
|
}
|
|
|
|
wg.Add(2)
|
|
go runRules()
|
|
go pushAlerts()
|
|
|
|
return nil
|
|
}
|
|
|
|
func uninitRule() error {
|
|
close(chStop)
|
|
wg.Wait()
|
|
return nil
|
|
}
|