Merge pull request #28401 from taosdata/feat/TS-5484-audit-delete
feat/TS-5484-audit-delete
This commit is contained in:
commit
c3a8daced4
|
@ -142,6 +142,7 @@ extern bool tsMonitorForceV2;
|
|||
// audit
|
||||
extern bool tsEnableAudit;
|
||||
extern bool tsEnableAuditCreateTable;
|
||||
extern bool tsEnableAuditDelete;
|
||||
extern int32_t tsAuditInterval;
|
||||
|
||||
// telem
|
||||
|
|
|
@ -1022,6 +1022,7 @@ typedef struct {
|
|||
char sDetailVer[128];
|
||||
int64_t whiteListVer;
|
||||
SMonitorParas monitorParas;
|
||||
int8_t enableAuditDelete;
|
||||
} SConnectRsp;
|
||||
|
||||
int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp);
|
||||
|
@ -1827,6 +1828,17 @@ int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
|
|||
int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq);
|
||||
void tFreeSStatisReq(SStatisReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
char table[TSDB_TABLE_NAME_LEN];
|
||||
char operation[AUDIT_OPERATION_LEN];
|
||||
int32_t sqlLen;
|
||||
char* pSql;
|
||||
} SAuditReq;
|
||||
int32_t tSerializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq);
|
||||
int32_t tDeserializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq);
|
||||
void tFreeSAuditReq(SAuditReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
int64_t clusterId;
|
||||
|
@ -3415,6 +3427,7 @@ typedef struct {
|
|||
int32_t svrTimestamp;
|
||||
SArray* rsps; // SArray<SClientHbRsp>
|
||||
SMonitorParas monitorParas;
|
||||
int8_t enableAuditDelete;
|
||||
} SClientHbBatchRsp;
|
||||
|
||||
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); }
|
||||
|
|
|
@ -259,6 +259,7 @@
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL)
|
||||
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
|
||||
|
||||
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8
|
||||
|
|
|
@ -29,7 +29,6 @@ extern "C" {
|
|||
#endif
|
||||
|
||||
#define AUDIT_DETAIL_MAX 65472
|
||||
#define AUDIT_OPERATION_LEN 20
|
||||
|
||||
typedef struct {
|
||||
const char *server;
|
||||
|
|
|
@ -24,6 +24,7 @@ extern "C" {
|
|||
#include "thash.h"
|
||||
#include "query.h"
|
||||
#include "tqueue.h"
|
||||
#include "clientInt.h"
|
||||
|
||||
typedef enum {
|
||||
SQL_RESULT_SUCCESS = 0,
|
||||
|
@ -81,6 +82,8 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name,
|
|||
void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values);
|
||||
const char* monitorResultStr(SQL_RESULT_CODE code);
|
||||
int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data);
|
||||
|
||||
void clientOperateReport(SRequestObj* pRequest);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -653,6 +653,8 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 };
|
|||
#define MONITOR_TAG_VALUE_LEN 300
|
||||
#define MONITOR_METRIC_NAME_LEN 100
|
||||
|
||||
#define AUDIT_OPERATION_LEN 20
|
||||
|
||||
typedef enum {
|
||||
ANAL_ALGO_TYPE_ANOMALY_DETECT = 0,
|
||||
ANAL_ALGO_TYPE_FORECAST = 1,
|
||||
|
|
|
@ -108,6 +108,10 @@ typedef struct SQueryExecMetric {
|
|||
int64_t execCostUs;
|
||||
} SQueryExecMetric;
|
||||
|
||||
typedef struct {
|
||||
SMonitorParas monitorParas;
|
||||
int8_t enableAuditDelete;
|
||||
} SAppInstServerCFG;
|
||||
struct SAppInstInfo {
|
||||
int64_t numOfConns;
|
||||
SCorEpSet mgmtEp;
|
||||
|
@ -121,7 +125,7 @@ struct SAppInstInfo {
|
|||
void* pTransporter;
|
||||
SAppHbMgr* pAppHbMgr;
|
||||
char* instKey;
|
||||
SMonitorParas monitorParas;
|
||||
SAppInstServerCFG serverCfg;
|
||||
};
|
||||
|
||||
typedef struct SAppInfo {
|
||||
|
|
|
@ -166,11 +166,11 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int
|
|||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType)));
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(
|
||||
json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows)));
|
||||
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen) {
|
||||
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen];
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0';
|
||||
if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) {
|
||||
char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen];
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0';
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp;
|
||||
pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp;
|
||||
} else {
|
||||
ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr)));
|
||||
}
|
||||
|
@ -284,7 +284,7 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) {
|
||||
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
|
||||
if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) {
|
||||
sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT);
|
||||
} else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) {
|
||||
|
@ -294,15 +294,15 @@ static void deregisterRequest(SRequestObj *pRequest) {
|
|||
}
|
||||
}
|
||||
|
||||
if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL ||
|
||||
duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
|
||||
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) {
|
||||
if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL ||
|
||||
duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThresholdTest * 1000000UL) &&
|
||||
checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) {
|
||||
(void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1);
|
||||
if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) {
|
||||
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) {
|
||||
taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s",
|
||||
taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration,
|
||||
pRequest->sqlstr);
|
||||
if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) {
|
||||
if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) {
|
||||
slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration);
|
||||
if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) {
|
||||
tscError("failed to generate write slow log");
|
||||
|
|
|
@ -605,7 +605,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
|
|||
return code;
|
||||
}
|
||||
|
||||
pInst->monitorParas = pRsp.monitorParas;
|
||||
pInst->serverCfg.monitorParas = pRsp.monitorParas;
|
||||
pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete;
|
||||
tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId,
|
||||
pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope);
|
||||
|
||||
|
|
|
@ -2873,6 +2873,7 @@ void syncQueryFn(void* param, void* res, int32_t code) {
|
|||
|
||||
if (pParam->pRequest) {
|
||||
pParam->pRequest->code = code;
|
||||
clientOperateReport(pParam->pRequest);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) {
|
||||
|
|
|
@ -2,8 +2,6 @@
|
|||
#include "cJSON.h"
|
||||
#include "clientInt.h"
|
||||
#include "clientLog.h"
|
||||
#include "os.h"
|
||||
#include "tglobal.h"
|
||||
#include "tmisce.h"
|
||||
#include "tqueue.h"
|
||||
#include "ttime.h"
|
||||
|
@ -19,6 +17,7 @@ STaosQueue* monitorQueue;
|
|||
SHashObj* monitorSlowLogHash;
|
||||
char tmpSlowLogPath[PATH_MAX] = {0};
|
||||
TdThread monitorThread;
|
||||
extern bool tsEnableAuditDelete;
|
||||
|
||||
static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) {
|
||||
int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir);
|
||||
|
@ -216,7 +215,7 @@ static void reportSendProcess(void* param, void* tmrId) {
|
|||
SEpSet ep = getEpSet_s(&pInst->mgmtEp);
|
||||
generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep);
|
||||
bool reset =
|
||||
taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
|
||||
taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId);
|
||||
tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset);
|
||||
taosRUnLockLatch(&monitorLock);
|
||||
}
|
||||
|
@ -289,7 +288,7 @@ void monitorCreateClient(int64_t clusterId) {
|
|||
goto fail;
|
||||
}
|
||||
pMonitor->timer =
|
||||
taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
|
||||
taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer);
|
||||
if (pMonitor->timer == NULL) {
|
||||
tscError("failed to start timer");
|
||||
goto fail;
|
||||
|
@ -660,7 +659,7 @@ static void monitorSendAllSlowLog() {
|
|||
taosHashCancelIterate(monitorSlowLogHash, pIter);
|
||||
return;
|
||||
}
|
||||
if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000) {
|
||||
if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) {
|
||||
pClient->lastCheckTime = t;
|
||||
} else {
|
||||
continue;
|
||||
|
@ -686,7 +685,7 @@ static void monitorSendAllSlowLog() {
|
|||
static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) {
|
||||
SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId);
|
||||
|
||||
if (pInst == NULL || !pInst->monitorParas.tsEnableMonitor) {
|
||||
if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) {
|
||||
tscInfo("[monitor] monitor is disabled, skip send slow log");
|
||||
return;
|
||||
}
|
||||
|
@ -932,4 +931,101 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) {
|
|||
taosFreeQitem(slowLogData);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
tscDebug("[del report]delete reportCB code:%d", code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) {
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
tscError("[del report]failed to allocate memory for sendInfo");
|
||||
return terrno;
|
||||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL};
|
||||
|
||||
sendInfo->requestId = requestId;
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = NULL;
|
||||
sendInfo->fp = reportCB;
|
||||
sendInfo->msgType = TDMT_MND_AUDIT;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
|
||||
if (code != 0) {
|
||||
tscError("[del report]failed to send msg to server, code:%d", code);
|
||||
taosMemoryFree(sendInfo);
|
||||
return code;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void reportDeleteSql(SRequestObj* pRequest) {
|
||||
SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot;
|
||||
STscObj* pTscObj = pRequest->pTscObj;
|
||||
|
||||
if (pTscObj == NULL || pTscObj->pAppInfo == NULL) {
|
||||
tscError("[del report]invalid tsc obj");
|
||||
return;
|
||||
}
|
||||
|
||||
if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) {
|
||||
tscDebug("[del report]audit delete is disabled");
|
||||
return;
|
||||
}
|
||||
|
||||
if (pRequest->code != TSDB_CODE_SUCCESS) {
|
||||
tscDebug("[del report]delete request result code:%d", pRequest->code);
|
||||
return;
|
||||
}
|
||||
|
||||
if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) {
|
||||
tscError("[del report]invalid from table node type:%d", nodeType(pStmt->pFromTable));
|
||||
return;
|
||||
}
|
||||
|
||||
SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable;
|
||||
SAuditReq req;
|
||||
req.pSql = pRequest->sqlstr;
|
||||
req.sqlLen = pRequest->sqlLen;
|
||||
TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName));
|
||||
TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName));
|
||||
TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete"));
|
||||
int32_t tlen = tSerializeSAuditReq(NULL, 0, &req);
|
||||
void* pReq = taosMemoryCalloc(1, tlen);
|
||||
if (pReq == NULL) {
|
||||
tscError("[del report]failed to allocate memory for req");
|
||||
return;
|
||||
}
|
||||
|
||||
if (tSerializeSAuditReq(pReq, tlen, &req) < 0) {
|
||||
tscError("[del report]failed to serialize req");
|
||||
taosMemoryFree(pReq);
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId);
|
||||
if (code != 0) {
|
||||
tscError("[del report]failed to send audit info, code:%d", code);
|
||||
taosMemoryFree(pReq);
|
||||
return;
|
||||
}
|
||||
tscDebug("[del report]delete data, sql:%s", req.pSql);
|
||||
}
|
||||
|
||||
void clientOperateReport(SRequestObj* pRequest) {
|
||||
if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) {
|
||||
tscError("[del report]invalid request");
|
||||
return;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) {
|
||||
reportDeleteSql(pRequest);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -135,7 +135,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
// update the appInstInfo
|
||||
pTscObj->pAppInfo->clusterId = connectRsp.clusterId;
|
||||
pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas;
|
||||
pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas;
|
||||
pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete;
|
||||
tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d",
|
||||
connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope);
|
||||
lastClusterId = connectRsp.clusterId;
|
||||
|
|
|
@ -1695,7 +1695,7 @@ END:
|
|||
}
|
||||
|
||||
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
|
||||
if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
|
||||
if (request->pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) {
|
||||
int32_t len = 0;
|
||||
int32_t rlen = 0;
|
||||
char *p = NULL;
|
||||
|
|
|
@ -119,6 +119,7 @@ bool tsMonitorForceV2 = true;
|
|||
// audit
|
||||
bool tsEnableAudit = true;
|
||||
bool tsEnableAuditCreateTable = true;
|
||||
bool tsEnableAuditDelete = true;
|
||||
int32_t tsAuditInterval = 5000;
|
||||
|
||||
// telem
|
||||
|
@ -779,6 +780,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableAuditDelete", tsEnableAuditDelete, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE));
|
||||
|
||||
|
@ -1495,6 +1497,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditCreateTable");
|
||||
tsEnableAuditCreateTable = pItem->bval;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableAuditDelete");
|
||||
tsEnableAuditDelete = pItem->bval;
|
||||
|
||||
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditInterval");
|
||||
tsAuditInterval = pItem->i32;
|
||||
|
||||
|
|
|
@ -567,6 +567,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa
|
|||
TAOS_CHECK_EXIT(tSerializeSClientHbRsp(&encoder, pRsp));
|
||||
}
|
||||
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pBatchRsp->enableAuditDelete));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -609,6 +610,12 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR
|
|||
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas));
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pBatchRsp->enableAuditDelete));
|
||||
} else {
|
||||
pBatchRsp->enableAuditDelete = 0;
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
@ -1813,6 +1820,60 @@ _exit:
|
|||
|
||||
void tFreeSDropUserReq(SDropUserReq *pReq) { FREESQL(); }
|
||||
|
||||
int32_t tSerializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
int32_t tlen;
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
TAOS_CHECK_EXIT(tStartEncode(&encoder));
|
||||
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->operation));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->db));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->table));
|
||||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->sqlLen));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->pSql));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tlen = code;
|
||||
} else {
|
||||
tlen = encoder.pos;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
TAOS_CHECK_EXIT(tStartDecode(&decoder));
|
||||
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->operation));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->db));
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->table));
|
||||
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->sqlLen));
|
||||
if (pReq->sqlLen > 0) {
|
||||
pReq->pSql = taosMemoryMalloc(pReq->sqlLen + 1);
|
||||
if (pReq->pSql == NULL) {
|
||||
TAOS_CHECK_EXIT(terrno);
|
||||
}
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->pSql));
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return code;
|
||||
}
|
||||
|
||||
void tFreeSAuditReq(SAuditReq *pReq) { taosMemoryFreeClear(pReq->pSql); }
|
||||
|
||||
SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) {
|
||||
if (pIpWhiteList == NULL) return NULL;
|
||||
|
||||
|
@ -6294,6 +6355,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
|||
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->authVer));
|
||||
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->whiteListVer));
|
||||
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pRsp->monitorParas));
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRsp->enableAuditDelete));
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -6345,6 +6407,11 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) {
|
|||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas));
|
||||
}
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRsp->enableAuditDelete));
|
||||
} else {
|
||||
pRsp->enableAuditDelete = 0;
|
||||
}
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -212,6 +212,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STATIS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_AUDIT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_COMPACT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_CLUSTER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -86,6 +86,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq);
|
|||
static int32_t mndProcessNotifyReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessStatisReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessAuditReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq);
|
||||
static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp);
|
||||
static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp);
|
||||
|
@ -125,6 +126,7 @@ int32_t mndInitDnode(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq);
|
||||
|
@ -604,6 +606,24 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndProcessAuditReq(SRpcMsg *pReq) {
|
||||
mTrace("process audit req:%p", pReq);
|
||||
if (tsEnableAudit && tsEnableAuditDelete) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SAuditReq auditReq = {0};
|
||||
|
||||
TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq));
|
||||
|
||||
mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql);
|
||||
|
||||
auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql,
|
||||
auditReq.sqlLen);
|
||||
|
||||
tFreeSAuditReq(&auditReq);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) {
|
||||
int32_t code = 0, lino = 0;
|
||||
SDnodeInfoReq infoReq = {0};
|
||||
|
|
|
@ -305,6 +305,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) {
|
|||
connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
|
||||
connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold;
|
||||
connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest;
|
||||
connectRsp.enableAuditDelete = tsEnableAuditDelete;
|
||||
tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
|
||||
connectRsp.whiteListVer = pUser->ipWhiteListVer;
|
||||
|
||||
|
@ -709,6 +710,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) {
|
|||
tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN);
|
||||
batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen;
|
||||
batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope;
|
||||
batchRsp.enableAuditDelete = tsEnableAuditDelete;
|
||||
|
||||
int32_t sz = taosArrayGetSize(batchReq.reqs);
|
||||
for (int i = 0; i < sz; i++) {
|
||||
|
|
Loading…
Reference in New Issue