diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 58517a5db0..d7a81937aa 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -106,6 +106,7 @@ extern bool tsMonitorComp; // audit extern bool tsEnableAudit; extern bool tsEnableAuditCreateTable; +extern int32_t tsAuditInterval; // telem extern bool tsEnableTelem; diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 85d462b96b..dd3df27866 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -23,13 +23,13 @@ #include "tjson.h" #include "tmsgcb.h" #include "trpc.h" -#include "mnode.h" #ifdef __cplusplus extern "C" { #endif #define AUDIT_DETAIL_MAX 65472 +#define AUDIT_OPERATION_LEN 20 typedef struct { const char *server; @@ -37,13 +37,28 @@ typedef struct { bool comp; } SAuditCfg; +typedef struct { + int64_t curTime; + char strClusterId[TSDB_CLUSTER_ID_LEN]; + char clientAddress[50]; + char user[TSDB_USER_LEN]; + char operation[AUDIT_OPERATION_LEN]; + char target1[TSDB_DB_NAME_LEN]; //put db name + char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max + char* detail; +} SAuditRecord; + int32_t auditInit(const SAuditCfg *pCfg); +void auditCleanup(); void auditSend(SJson *pJson); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +void auditSendRecordsInBatch(); #ifdef __cplusplus } #endif -#endif /*_TD_MONITOR_H_*/ +#endif /*_TD_AUDIT_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 86db9180d2..23b8bd4b70 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -97,6 +97,7 @@ bool tsMonitorComp = false; // audit bool tsEnableAudit = true; bool tsEnableAuditCreateTable = true; +int32_t tsAuditInterval = 100; // telem #ifdef TD_ENTERPRISE @@ -671,6 +672,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1120,6 +1123,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval; + tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e43c2af47..a0d16cc4ea 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -30,12 +30,14 @@ typedef struct SDnodeMgmt { TdThread statusThread; TdThread notifyThread; TdThread monitorThread; + TdThread auditThread; TdThread crashReportThread; SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordsFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; @@ -62,6 +64,7 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 4bd32cac20..960d4afd8b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartMonitorThread(pMgmt) != 0) { return -1; } + if (dmStartAuditThread(pMgmt) != 0) { + return -1; + } if (dmStartCrashReportThread(pMgmt) != 0) { return -1; } @@ -60,6 +63,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 18da1d638c..94609a8055 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) { return NULL; } +static void *dmAuditThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-audit"); + + while (1) { + taosMsleep(100); + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = curTime - lastTime; + if (interval >= tsAuditInterval) { + (*pMgmt->sendAuditRecordsFp)(); + lastTime = curTime; + } + } + + return NULL; +} + static void *dmCrashReportThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); @@ -215,6 +236,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) { + dError("failed to create audit thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-audit", "initialized"); + return 0; +} + void dmStopMonitorThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->monitorThread)) { taosThreadJoin(pMgmt->monitorThread, NULL); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 36097438a2..83f9f13c82 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -124,6 +124,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); +void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 6f13abcebc..2310b9216b 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -185,6 +185,7 @@ void dmCleanup() { if (dmCheckRepeatCleanup(pDnode) != 0) return; dmCleanupDnode(pDnode); monCleanup(); + auditCleanup(); syncCleanUp(); walCleanUp(); udfcClose(); @@ -390,6 +391,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, + .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index b3db7c3058..c42aa6a1ae 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "dmNodes.h" +#include "audit.h" static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; @@ -108,6 +109,11 @@ void dmSendMonitorReport() { monSendReport(); } +//Todo: put this in seperate file in the future +void dmSendAuditRecords() { + auditSendRecordsInBatch(); +} + void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0a52c578a5..4769ef8538 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -86,6 +86,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); +typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); @@ -120,6 +121,7 @@ typedef struct { ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8cbca403e3..64febb4c97 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1009,11 +1009,15 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosMemoryFreeClear(*key); } + taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey")); + size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); + vInfo("create table %s", keyJoined); + if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1237,7 +1241,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index b6c6ec87e8..e5fed2e473 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -17,9 +17,12 @@ #define _TD_AUDIT_INT_H_ #include "audit.h" +#include "tarray.h" typedef struct { SAuditCfg cfg; + SArray *records; + TdThreadMutex lock; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index c408f0d87b..7616617ff0 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -14,6 +14,8 @@ */ #define _DEFAULT_SOURCE + +#include "tarray.h" #include "auditInt.h" #include "taoserror.h" #include "thttp.h" @@ -21,25 +23,56 @@ #include "tjson.h" #include "tglobal.h" #include "mnode.h" +#include "audit.h" SAudit tsAudit = {0}; char* tsAuditUri = "/audit"; +char* tsAuditBatchUri = "/audit-batch"; int32_t auditInit(const SAuditCfg *pCfg) { tsAudit.cfg = *pCfg; + tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *)); + taosThreadMutexInit(&tsAudit.lock, NULL); return 0; } +void auditCleanup() { + tsLogFp = NULL; + taosArrayDestroy(tsAudit.records); + tsAudit.records = NULL; + taosThreadMutexDestroy(&tsAudit.lock); +} + extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +extern void auditSendRecordsInBatchImp(); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len); } +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { + auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len); +} + +void auditSendRecordsInBatch(){ + auditSendRecordsInBatchImp(); +} + #ifndef TD_ENTERPRISE void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { } + +void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { +} + +void auditSendRecordsInBatchImp(){ + +} #endif