Skip to content

Commit

Permalink
Merge pull request #688 from actiontech/issue687-filter-sqls
Browse files Browse the repository at this point in the history
Issue687 filter sqls
  • Loading branch information
sjjian authored Jul 22, 2022
2 parents cb204af + 142b1ef commit 330e01f
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 3 deletions.
19 changes: 18 additions & 1 deletion sqle/server/auditplan/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ const (
)

const (
paramKeyCollectIntervalMinute = "collect_interval_minute"
paramKeyCollectIntervalMinute = "collect_interval_minute"
paramKeyAuditSQLsScrappedInLastPeriodMinute = "audit_sqls_scrapped_in_last_period_minute"
)

var Metas = []Meta{
Expand All @@ -45,6 +46,14 @@ var Metas = []Meta{
Type: TypeMySQLSlowLog,
Desc: "慢日志",
InstanceType: InstanceTypeMySQL,
Params: []*params.Param{
{
Key: paramKeyAuditSQLsScrappedInLastPeriodMinute,
Desc: "审核过去时间段内抓取的SQL(分钟)",
Value: "0",
Type: params.ParamTypeInt,
},
},
},
{
Type: TypeMySQLMybatis,
Expand Down Expand Up @@ -104,6 +113,14 @@ var Metas = []Meta{
Type: TypeTiDBAuditLog,
Desc: "TiDB审计日志",
InstanceType: InstanceTypeTiDB,
Params: []*params.Param{
{
Key: paramKeyAuditSQLsScrappedInLastPeriodMinute,
Desc: "审核过去时间段内抓取的SQL(分钟)",
Value: "0",
Type: params.ParamTypeInt,
},
},
},
}

Expand Down
51 changes: 49 additions & 2 deletions sqle/server/auditplan/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/actiontech/sqle/sqle/log"
"github.com/actiontech/sqle/sqle/model"
"github.com/actiontech/sqle/sqle/pkg/oracle"
"github.com/actiontech/sqle/sqle/pkg/params"
"github.com/actiontech/sqle/sqle/server"
"github.com/actiontech/sqle/sqle/utils"

Expand All @@ -26,6 +27,7 @@ import (
)

var errNoSQLInAuditPlan = errors.New(errors.DataConflict, fmt.Errorf("there is no SQLs in audit plan"))
var errNoSQLNeedToBeAudited = errors.New(errors.DataConflict, fmt.Errorf("there is no SQLs need to be audited in audit plan"))

type Task interface {
Start() error
Expand Down Expand Up @@ -96,7 +98,16 @@ func (at *baseTask) audit(task *model.Task) (*model.AuditPlanReportV2, error) {
return nil, errNoSQLInAuditPlan
}

for i, sql := range auditPlanSQLs {
filteredSqls, err := filterSQLsByPeriod(at.ap.Params, auditPlanSQLs)
if err != nil {
return nil, err
}

if len(filteredSqls) == 0 {
return nil, errNoSQLNeedToBeAudited
}

for i, sql := range filteredSqls {
task.ExecuteSQLs = append(task.ExecuteSQLs, &model.ExecuteSQL{
BaseSQL: model.BaseSQL{
Number: uint(i),
Expand Down Expand Up @@ -130,6 +141,33 @@ func (at *baseTask) audit(task *model.Task) (*model.AuditPlanReportV2, error) {
return auditPlanReport, nil
}

func filterSQLsByPeriod(params params.Params, sqls []*model.AuditPlanSQLV2) (filteredSqls []*model.AuditPlanSQLV2, err error) {
period := params.GetParam(paramKeyAuditSQLsScrappedInLastPeriodMinute).Int()
if period <= 0 {
return sqls, nil
}

t := time.Now()
minus := -1
startTime := t.Add(time.Minute * time.Duration(minus*period))
for _, sql := range sqls {
var info = struct {
LastReceiveTimestamp time.Time `json:"last_receive_timestamp"`
}{}
err := json.Unmarshal(sql.Info, &info)
if err != nil {
return nil, fmt.Errorf("parse last_receive_timestamp failed: %v", err)
}

if info.LastReceiveTimestamp.Before(startTime) {
continue
}
newSql := *sql
filteredSqls = append(filteredSqls, &newSql)
}
return filteredSqls, nil
}

type sqlCollector struct {
*baseTask
sync.WaitGroup
Expand Down Expand Up @@ -598,7 +636,16 @@ func (at *TiDBAuditLogTask) Audit() (*model.AuditPlanReportV2, error) {
return nil, errNoSQLInAuditPlan
}

for i, sql := range auditPlanSQLs {
filteredSqls, err := filterSQLsByPeriod(at.ap.Params, auditPlanSQLs)
if err != nil {
return nil, err
}

if len(filteredSqls) == 0 {
return nil, errNoSQLNeedToBeAudited
}

for i, sql := range filteredSqls {
schema := ""
info, _ := sql.Info.OriginValue()
if schemaStr, ok := info[server.AuditSchema].(string); ok {
Expand Down

0 comments on commit 330e01f

Please sign in to comment.