diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 73500853f0..bf3fa716c6 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -142,6 +142,7 @@ extern bool tsMonitorForceV2; // audit extern bool tsEnableAudit; extern bool tsEnableAuditCreateTable; +extern bool tsEnableAuditDelete; extern int32_t tsAuditInterval; // telem diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f73125b2b3..c498d691d9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1022,6 +1022,7 @@ typedef struct { char sDetailVer[128]; int64_t whiteListVer; SMonitorParas monitorParas; + int8_t enableAuditDelete; } SConnectRsp; int32_t tSerializeSConnectRsp(void* buf, int32_t bufLen, SConnectRsp* pRsp); @@ -1827,6 +1828,17 @@ int32_t tSerializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); int32_t tDeserializeSStatisReq(void* buf, int32_t bufLen, SStatisReq* pReq); void tFreeSStatisReq(SStatisReq* pReq); +typedef struct { + char db[TSDB_DB_FNAME_LEN]; + char table[TSDB_TABLE_NAME_LEN]; + char operation[AUDIT_OPERATION_LEN]; + int32_t sqlLen; + char* pSql; +} SAuditReq; +int32_t tSerializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq); +int32_t tDeserializeSAuditReq(void* buf, int32_t bufLen, SAuditReq* pReq); +void tFreeSAuditReq(SAuditReq* pReq); + typedef struct { int32_t dnodeId; int64_t clusterId; @@ -3415,6 +3427,7 @@ typedef struct { int32_t svrTimestamp; SArray* rsps; // SArray SMonitorParas monitorParas; + int8_t enableAuditDelete; } SClientHbBatchRsp; static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { return taosIntHash_64(key, keyLen); } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 2c797e39bf..c22a3da5ad 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -259,6 +259,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_STREAM_DROP_ORPHANTASKS, "stream-drop-orphan-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_STREAM_TASK_RESET, "stream-reset-tasks", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_UPDATE_DNODE_INFO, "update-dnode-info", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_AUDIT, "audit", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 2e786ab2b3..f5710256e9 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -29,7 +29,6 @@ extern "C" { #endif #define AUDIT_DETAIL_MAX 65472 -#define AUDIT_OPERATION_LEN 20 typedef struct { const char *server; diff --git a/include/libs/monitor/clientMonitor.h b/include/libs/monitor/clientMonitor.h index 0085173ecd..b09a1ac11c 100644 --- a/include/libs/monitor/clientMonitor.h +++ b/include/libs/monitor/clientMonitor.h @@ -24,6 +24,7 @@ extern "C" { #include "thash.h" #include "query.h" #include "tqueue.h" +#include "clientInt.h" typedef enum { SQL_RESULT_SUCCESS = 0, @@ -81,6 +82,8 @@ void monitorCreateClientCounter(int64_t clusterId, const char* name, void monitorCounterInc(int64_t clusterId, const char* counterName, const char** label_values); const char* monitorResultStr(SQL_RESULT_CODE code); int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data); + +void clientOperateReport(SRequestObj* pRequest); #ifdef __cplusplus } #endif diff --git a/include/util/tdef.h b/include/util/tdef.h index 7ef01f9752..ba30e78c59 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -653,6 +653,8 @@ enum { RAND_ERR_MEMORY = 1, RAND_ERR_FILE = 2, RAND_ERR_NETWORK = 4 }; #define MONITOR_TAG_VALUE_LEN 300 #define MONITOR_METRIC_NAME_LEN 100 +#define AUDIT_OPERATION_LEN 20 + typedef enum { ANAL_ALGO_TYPE_ANOMALY_DETECT = 0, ANAL_ALGO_TYPE_FORECAST = 1, diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 19a39cb5f0..90505ed25a 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -108,6 +108,10 @@ typedef struct SQueryExecMetric { int64_t execCostUs; } SQueryExecMetric; +typedef struct { + SMonitorParas monitorParas; + int8_t enableAuditDelete; +} SAppInstServerCFG; struct SAppInstInfo { int64_t numOfConns; SCorEpSet mgmtEp; @@ -121,7 +125,7 @@ struct SAppInstInfo { void* pTransporter; SAppHbMgr* pAppHbMgr; char* instKey; - SMonitorParas monitorParas; + SAppInstServerCFG serverCfg; }; typedef struct SAppInfo { diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index ec3ccf007e..c56a627ec7 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -166,11 +166,11 @@ static int32_t generateWriteSlowLog(STscObj *pTscObj, SRequestObj *pRequest, int ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "type", cJSON_CreateNumber(reqType))); ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject( json, "rows_num", cJSON_CreateNumber(pRequest->body.resInfo.numOfRows + pRequest->body.resInfo.totalRows))); - if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen) { - char tmp = pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen]; - pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = '\0'; + if (pRequest->sqlstr != NULL && strlen(pRequest->sqlstr) > pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen) { + char tmp = pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen]; + pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = '\0'; ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); - pRequest->sqlstr[pTscObj->pAppInfo->monitorParas.tsSlowLogMaxLen] = tmp; + pRequest->sqlstr[pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogMaxLen] = tmp; } else { ENV_JSON_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", cJSON_CreateString(pRequest->sqlstr))); } @@ -284,7 +284,7 @@ static void deregisterRequest(SRequestObj *pRequest) { } } - if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) { + if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) { if (QUERY_NODE_VNODE_MODIFY_STMT == pRequest->stmtType || QUERY_NODE_INSERT_STMT == pRequest->stmtType) { sqlReqLog(pTscObj->id, pRequest->killed, pRequest->code, MONITORSQLTYPEINSERT); } else if (QUERY_NODE_SELECT_STMT == pRequest->stmtType) { @@ -294,15 +294,15 @@ static void deregisterRequest(SRequestObj *pRequest) { } } - if ((duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThreshold * 1000000UL || - duration >= pTscObj->pAppInfo->monitorParas.tsSlowLogThresholdTest * 1000000UL) && - checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->monitorParas.tsSlowLogExceptDb)) { + if ((duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThreshold * 1000000UL || + duration >= pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogThresholdTest * 1000000UL) && + checkSlowLogExceptDb(pRequest, pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogExceptDb)) { (void)atomic_add_fetch_64((int64_t *)&pActivity->numOfSlowQueries, 1); - if (pTscObj->pAppInfo->monitorParas.tsSlowLogScope & reqType) { + if (pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & reqType) { taosPrintSlowLog("PID:%d, Conn:%u,QID:0x%" PRIx64 ", Start:%" PRId64 " us, Duration:%" PRId64 "us, SQL:%s", taosGetPId(), pTscObj->connId, pRequest->requestId, pRequest->metric.start, duration, pRequest->sqlstr); - if (pTscObj->pAppInfo->monitorParas.tsEnableMonitor) { + if (pTscObj->pAppInfo->serverCfg.monitorParas.tsEnableMonitor) { slowQueryLog(pTscObj->id, pRequest->killed, pRequest->code, duration); if (TSDB_CODE_SUCCESS != generateWriteSlowLog(pTscObj, pRequest, reqType, duration)) { tscError("failed to generate write slow log"); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 75716d0bd2..07be4bb596 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -605,7 +605,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { return code; } - pInst->monitorParas = pRsp.monitorParas; + pInst->serverCfg.monitorParas = pRsp.monitorParas; + pInst->serverCfg.enableAuditDelete = pRsp.enableAuditDelete; tscDebug("[monitor] paras from hb, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", pInst->clusterId, pRsp.monitorParas.tsSlowLogThreshold, pRsp.monitorParas.tsSlowLogScope); diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 1a5e24cfec..8a0b1ddaab 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -2873,6 +2873,7 @@ void syncQueryFn(void* param, void* res, int32_t code) { if (pParam->pRequest) { pParam->pRequest->code = code; + clientOperateReport(pParam->pRequest); } if (TSDB_CODE_SUCCESS != tsem_post(&pParam->sem)) { diff --git a/source/client/src/clientMonitor.c b/source/client/src/clientMonitor.c index 595c871953..5f7e11b6a3 100644 --- a/source/client/src/clientMonitor.c +++ b/source/client/src/clientMonitor.c @@ -2,8 +2,6 @@ #include "cJSON.h" #include "clientInt.h" #include "clientLog.h" -#include "os.h" -#include "tglobal.h" #include "tmisce.h" #include "tqueue.h" #include "ttime.h" @@ -19,6 +17,7 @@ STaosQueue* monitorQueue; SHashObj* monitorSlowLogHash; char tmpSlowLogPath[PATH_MAX] = {0}; TdThread monitorThread; +extern bool tsEnableAuditDelete; static int32_t getSlowLogTmpDir(char* tmpPath, int32_t size) { int ret = tsnprintf(tmpPath, size, "%s/tdengine_slow_log/", tsTempDir); @@ -216,7 +215,7 @@ static void reportSendProcess(void* param, void* tmrId) { SEpSet ep = getEpSet_s(&pInst->mgmtEp); generateClusterReport(pMonitor->registry, pInst->pTransporter, &ep); bool reset = - taosTmrReset(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); + taosTmrReset(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, param, monitorTimer, &tmrId); tscDebug("reset timer, pMonitor:%p, %d", pMonitor, reset); taosRUnLockLatch(&monitorLock); } @@ -289,7 +288,7 @@ void monitorCreateClient(int64_t clusterId) { goto fail; } pMonitor->timer = - taosTmrStart(reportSendProcess, pInst->monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); + taosTmrStart(reportSendProcess, pInst->serverCfg.monitorParas.tsMonitorInterval * 1000, (void*)pMonitor, monitorTimer); if (pMonitor->timer == NULL) { tscError("failed to start timer"); goto fail; @@ -660,7 +659,7 @@ static void monitorSendAllSlowLog() { taosHashCancelIterate(monitorSlowLogHash, pIter); return; } - if (t - pClient->lastCheckTime > pInst->monitorParas.tsMonitorInterval * 1000) { + if (t - pClient->lastCheckTime > pInst->serverCfg.monitorParas.tsMonitorInterval * 1000) { pClient->lastCheckTime = t; } else { continue; @@ -686,7 +685,7 @@ static void monitorSendAllSlowLog() { static void monitorSendAllSlowLogFromTempDir(int64_t clusterId) { SAppInstInfo* pInst = getAppInstByClusterId((int64_t)clusterId); - if (pInst == NULL || !pInst->monitorParas.tsEnableMonitor) { + if (pInst == NULL || !pInst->serverCfg.monitorParas.tsEnableMonitor) { tscInfo("[monitor] monitor is disabled, skip send slow log"); return; } @@ -932,4 +931,101 @@ int32_t monitorPutData2MonitorQueue(MonitorSlowLogData data) { taosFreeQitem(slowLogData); } return 0; -} \ No newline at end of file +} + +int32_t reportCB(void* param, SDataBuf* pMsg, int32_t code) { + taosMemoryFree(pMsg->pData); + taosMemoryFree(pMsg->pEpSet); + tscDebug("[del report]delete reportCB code:%d", code); + return 0; +} + +int32_t senAuditInfo(STscObj* pTscObj, void* pReq, int32_t len, uint64_t requestId) { + SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (sendInfo == NULL) { + tscError("[del report]failed to allocate memory for sendInfo"); + return terrno; + } + + sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = len, .handle = NULL}; + + sendInfo->requestId = requestId; + sendInfo->requestObjRefId = 0; + sendInfo->param = NULL; + sendInfo->fp = reportCB; + sendInfo->msgType = TDMT_MND_AUDIT; + + SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); + + int32_t code = asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + if (code != 0) { + tscError("[del report]failed to send msg to server, code:%d", code); + taosMemoryFree(sendInfo); + return code; + } + return TSDB_CODE_SUCCESS; +} + +static void reportDeleteSql(SRequestObj* pRequest) { + SDeleteStmt* pStmt = (SDeleteStmt*)pRequest->pQuery->pRoot; + STscObj* pTscObj = pRequest->pTscObj; + + if (pTscObj == NULL || pTscObj->pAppInfo == NULL) { + tscError("[del report]invalid tsc obj"); + return; + } + + if(pTscObj->pAppInfo->serverCfg.enableAuditDelete == 0) { + tscDebug("[del report]audit delete is disabled"); + return; + } + + if (pRequest->code != TSDB_CODE_SUCCESS) { + tscDebug("[del report]delete request result code:%d", pRequest->code); + return; + } + + if (nodeType(pStmt->pFromTable) != QUERY_NODE_REAL_TABLE) { + tscError("[del report]invalid from table node type:%d", nodeType(pStmt->pFromTable)); + return; + } + + SRealTableNode* pTable = (SRealTableNode*)pStmt->pFromTable; + SAuditReq req; + req.pSql = pRequest->sqlstr; + req.sqlLen = pRequest->sqlLen; + TAOS_UNUSED(tsnprintf(req.table, TSDB_TABLE_NAME_LEN, "%s", pTable->table.tableName)); + TAOS_UNUSED(tsnprintf(req.db, TSDB_DB_FNAME_LEN, "%s", pTable->table.dbName)); + TAOS_UNUSED(tsnprintf(req.operation, AUDIT_OPERATION_LEN, "delete")); + int32_t tlen = tSerializeSAuditReq(NULL, 0, &req); + void* pReq = taosMemoryCalloc(1, tlen); + if (pReq == NULL) { + tscError("[del report]failed to allocate memory for req"); + return; + } + + if (tSerializeSAuditReq(pReq, tlen, &req) < 0) { + tscError("[del report]failed to serialize req"); + taosMemoryFree(pReq); + return; + } + + int32_t code = senAuditInfo(pRequest->pTscObj, pReq, tlen, pRequest->requestId); + if (code != 0) { + tscError("[del report]failed to send audit info, code:%d", code); + taosMemoryFree(pReq); + return; + } + tscDebug("[del report]delete data, sql:%s", req.pSql); +} + +void clientOperateReport(SRequestObj* pRequest) { + if (pRequest == NULL || pRequest->pQuery == NULL || pRequest->pQuery->pRoot == NULL) { + tscError("[del report]invalid request"); + return; + } + + if (QUERY_NODE_DELETE_STMT == nodeType(pRequest->pQuery->pRoot)) { + reportDeleteSql(pRequest); + } +} diff --git a/source/client/src/clientMsgHandler.c b/source/client/src/clientMsgHandler.c index 6f557c57b1..9a723218ff 100644 --- a/source/client/src/clientMsgHandler.c +++ b/source/client/src/clientMsgHandler.c @@ -135,7 +135,8 @@ int32_t processConnectRsp(void* param, SDataBuf* pMsg, int32_t code) { // update the appInstInfo pTscObj->pAppInfo->clusterId = connectRsp.clusterId; - pTscObj->pAppInfo->monitorParas = connectRsp.monitorParas; + pTscObj->pAppInfo->serverCfg.monitorParas = connectRsp.monitorParas; + pTscObj->pAppInfo->serverCfg.enableAuditDelete = connectRsp.enableAuditDelete; tscDebug("[monitor] paras from connect rsp, clusterId:%" PRIx64 " monitorParas threshold:%d scope:%d", connectRsp.clusterId, connectRsp.monitorParas.tsSlowLogThreshold, connectRsp.monitorParas.tsSlowLogScope); lastClusterId = connectRsp.clusterId; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 90c7895d94..8602421ed0 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1695,7 +1695,7 @@ END: } void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) { - if (request->pTscObj->pAppInfo->monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { + if (request->pTscObj->pAppInfo->serverCfg.monitorParas.tsSlowLogScope & SLOW_LOG_TYPE_INSERT) { int32_t len = 0; int32_t rlen = 0; char *p = NULL; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index e8fca0c01f..9c72e3b498 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -119,6 +119,7 @@ bool tsMonitorForceV2 = true; // audit bool tsEnableAudit = true; bool tsEnableAuditCreateTable = true; +bool tsEnableAuditDelete = true; int32_t tsAuditInterval = 5000; // telem @@ -779,6 +780,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddBool(pCfg, "monitorForceV2", tsMonitorForceV2, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "enableAuditDelete", tsEnableAuditDelete, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE)); @@ -1495,6 +1497,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditCreateTable"); tsEnableAuditCreateTable = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "enableAuditDelete"); + tsEnableAuditDelete = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditInterval"); tsAuditInterval = pItem->i32; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9b54da2c30..458badc764 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -567,6 +567,7 @@ int32_t tSerializeSClientHbBatchRsp(void *buf, int32_t bufLen, const SClientHbBa TAOS_CHECK_EXIT(tSerializeSClientHbRsp(&encoder, pRsp)); } TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pBatchRsp->monitorParas)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pBatchRsp->enableAuditDelete)); tEndEncode(&encoder); _exit: @@ -609,6 +610,12 @@ int32_t tDeserializeSClientHbBatchRsp(void *buf, int32_t bufLen, SClientHbBatchR TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pBatchRsp->monitorParas)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pBatchRsp->enableAuditDelete)); + } else { + pBatchRsp->enableAuditDelete = 0; + } + tEndDecode(&decoder); _exit: @@ -1813,6 +1820,60 @@ _exit: void tFreeSDropUserReq(SDropUserReq *pReq) { FREESQL(); } +int32_t tSerializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) { + SEncoder encoder = {0}; + int32_t code = 0; + int32_t lino; + int32_t tlen; + tEncoderInit(&encoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartEncode(&encoder)); + + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->operation)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->db)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->table)); + TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->sqlLen)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->pSql)); + + tEndEncode(&encoder); + +_exit: + if (code) { + tlen = code; + } else { + tlen = encoder.pos; + } + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSAuditReq(void *buf, int32_t bufLen, SAuditReq *pReq) { + SDecoder decoder = {0}; + int32_t code = 0; + int32_t lino; + tDecoderInit(&decoder, buf, bufLen); + + TAOS_CHECK_EXIT(tStartDecode(&decoder)); + + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->operation)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->db)); + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->table)); + TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->sqlLen)); + if (pReq->sqlLen > 0) { + pReq->pSql = taosMemoryMalloc(pReq->sqlLen + 1); + if (pReq->pSql == NULL) { + TAOS_CHECK_EXIT(terrno); + } + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->pSql)); + } + tEndDecode(&decoder); +_exit: + tDecoderClear(&decoder); + return code; +} + +void tFreeSAuditReq(SAuditReq *pReq) { taosMemoryFreeClear(pReq->pSql); } + SIpWhiteList *cloneIpWhiteList(SIpWhiteList *pIpWhiteList) { if (pIpWhiteList == NULL) return NULL; @@ -6294,6 +6355,7 @@ int32_t tSerializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { TAOS_CHECK_EXIT(tEncodeI32(&encoder, pRsp->authVer)); TAOS_CHECK_EXIT(tEncodeI64(&encoder, pRsp->whiteListVer)); TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pRsp->monitorParas)); + TAOS_CHECK_EXIT(tEncodeI8(&encoder, pRsp->enableAuditDelete)); tEndEncode(&encoder); _exit: @@ -6345,6 +6407,11 @@ int32_t tDeserializeSConnectRsp(void *buf, int32_t bufLen, SConnectRsp *pRsp) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pRsp->monitorParas)); } + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pRsp->enableAuditDelete)); + } else { + pRsp->enableAuditDelete = 0; + } tEndDecode(&decoder); _exit: diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index bc6c7e55ba..0d804eadf0 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -212,6 +212,7 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STATIS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_AUDIT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_COMPACT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG_CLUSTER, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 2a1689854b..24ae8382f9 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -86,6 +86,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq); static int32_t mndProcessNotifyReq(SRpcMsg *pReq); static int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq); static int32_t mndProcessStatisReq(SRpcMsg *pReq); +static int32_t mndProcessAuditReq(SRpcMsg *pReq); static int32_t mndProcessUpdateDnodeInfoReq(SRpcMsg *pReq); static int32_t mndProcessCreateEncryptKeyReq(SRpcMsg *pRsp); static int32_t mndProcessCreateEncryptKeyRsp(SRpcMsg *pRsp); @@ -125,6 +126,7 @@ int32_t mndInitDnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_SHOW_VARIABLES, mndProcessShowVariablesReq); mndSetMsgHandle(pMnode, TDMT_MND_RESTORE_DNODE, mndProcessRestoreDnodeReq); mndSetMsgHandle(pMnode, TDMT_MND_STATIS, mndProcessStatisReq); + mndSetMsgHandle(pMnode, TDMT_MND_AUDIT, mndProcessAuditReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_ENCRYPT_KEY, mndProcessCreateEncryptKeyReq); mndSetMsgHandle(pMnode, TDMT_DND_CREATE_ENCRYPT_KEY_RSP, mndProcessCreateEncryptKeyRsp); mndSetMsgHandle(pMnode, TDMT_MND_UPDATE_DNODE_INFO, mndProcessUpdateDnodeInfoReq); @@ -604,6 +606,24 @@ static int32_t mndProcessStatisReq(SRpcMsg *pReq) { return 0; } +static int32_t mndProcessAuditReq(SRpcMsg *pReq) { + mTrace("process audit req:%p", pReq); + if (tsEnableAudit && tsEnableAuditDelete) { + SMnode *pMnode = pReq->info.node; + SAuditReq auditReq = {0}; + + TAOS_CHECK_RETURN(tDeserializeSAuditReq(pReq->pCont, pReq->contLen, &auditReq)); + + mDebug("received audit req:%s, %s, %s, %s", auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql); + + auditAddRecord(pReq, pMnode->clusterId, auditReq.operation, auditReq.db, auditReq.table, auditReq.pSql, + auditReq.sqlLen); + + tFreeSAuditReq(&auditReq); + } + return 0; +} + static int32_t mndUpdateDnodeObj(SMnode *pMnode, SDnodeObj *pDnode) { int32_t code = 0, lino = 0; SDnodeInfoReq infoReq = {0}; diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 91df640bce..21aba8df10 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -305,6 +305,7 @@ static int32_t mndProcessConnectReq(SRpcMsg *pReq) { connectRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; connectRsp.monitorParas.tsSlowLogThreshold = tsSlowLogThreshold; connectRsp.monitorParas.tsSlowLogThresholdTest = tsSlowLogThresholdTest; + connectRsp.enableAuditDelete = tsEnableAuditDelete; tstrncpy(connectRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN); connectRsp.whiteListVer = pUser->ipWhiteListVer; @@ -709,6 +710,7 @@ static int32_t mndProcessHeartBeatReq(SRpcMsg *pReq) { tstrncpy(batchRsp.monitorParas.tsSlowLogExceptDb, tsSlowLogExceptDb, TSDB_DB_NAME_LEN); batchRsp.monitorParas.tsSlowLogMaxLen = tsSlowLogMaxLen; batchRsp.monitorParas.tsSlowLogScope = tsSlowLogScope; + batchRsp.enableAuditDelete = tsEnableAuditDelete; int32_t sz = taosArrayGetSize(batchReq.reqs); for (int i = 0; i < sz; i++) {