fix/TS-4251

This commit is contained in:
dmchen 2023-12-06 07:48:40 +00:00
parent 8e1970be49
commit dc9408ed5f
13 changed files with 117 additions and 4 deletions

View File

@ -106,6 +106,7 @@ extern bool tsMonitorComp;
// audit // audit
extern bool tsEnableAudit; extern bool tsEnableAudit;
extern bool tsEnableAuditCreateTable; extern bool tsEnableAuditCreateTable;
extern int32_t tsAuditInterval;
// telem // telem
extern bool tsEnableTelem; extern bool tsEnableTelem;

View File

@ -23,13 +23,13 @@
#include "tjson.h" #include "tjson.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "trpc.h" #include "trpc.h"
#include "mnode.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define AUDIT_DETAIL_MAX 65472 #define AUDIT_DETAIL_MAX 65472
#define AUDIT_OPERATION_LEN 20
typedef struct { typedef struct {
const char *server; const char *server;
@ -37,13 +37,28 @@ typedef struct {
bool comp; bool comp;
} SAuditCfg; } SAuditCfg;
typedef struct {
int64_t curTime;
char strClusterId[TSDB_CLUSTER_ID_LEN];
char clientAddress[50];
char user[TSDB_USER_LEN];
char operation[AUDIT_OPERATION_LEN];
char target1[TSDB_DB_NAME_LEN]; //put db name
char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max
char* detail;
} SAuditRecord;
int32_t auditInit(const SAuditCfg *pCfg); int32_t auditInit(const SAuditCfg *pCfg);
void auditCleanup();
void auditSend(SJson *pJson); void auditSend(SJson *pJson);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len); char *detail, int32_t len);
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
void auditSendRecordsInBatch();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
#endif /*_TD_MONITOR_H_*/ #endif /*_TD_AUDIT_H_*/

View File

@ -97,6 +97,7 @@ bool tsMonitorComp = false;
// audit // audit
bool tsEnableAudit = true; bool tsEnableAudit = true;
bool tsEnableAuditCreateTable = true; bool tsEnableAuditCreateTable = true;
int32_t tsAuditInterval = 100;
// telem // telem
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
@ -671,6 +672,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0)
return -1;
if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1;
@ -1120,6 +1123,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAudit = cfgGetItem(pCfg, "audit")->bval;
tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval; tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval;
tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32;
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;

View File

@ -30,12 +30,14 @@ typedef struct SDnodeMgmt {
TdThread statusThread; TdThread statusThread;
TdThread notifyThread; TdThread notifyThread;
TdThread monitorThread; TdThread monitorThread;
TdThread auditThread;
TdThread crashReportThread; TdThread crashReportThread;
SSingleWorker mgmtWorker; SSingleWorker mgmtWorker;
ProcessCreateNodeFp processCreateNodeFp; ProcessCreateNodeFp processCreateNodeFp;
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordsFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;
@ -62,6 +64,7 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
void dmStopNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt);
void dmStopMonitorThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt);
int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt);
void dmStopCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt);

View File

@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if (dmStartMonitorThread(pMgmt) != 0) { if (dmStartMonitorThread(pMgmt) != 0) {
return -1; return -1;
} }
if (dmStartAuditThread(pMgmt) != 0) {
return -1;
}
if (dmStartCrashReportThread(pMgmt) != 0) { if (dmStartCrashReportThread(pMgmt) != 0) {
return -1; return -1;
} }
@ -60,6 +63,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp;
pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp;
pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp;
pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp;
pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp;
pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp;
pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp;

View File

@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) {
return NULL; return NULL;
} }
static void *dmAuditThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-audit");
while (1) {
taosMsleep(100);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs();
if (curTime < lastTime) lastTime = curTime;
float interval = curTime - lastTime;
if (interval >= tsAuditInterval) {
(*pMgmt->sendAuditRecordsFp)();
lastTime = curTime;
}
}
return NULL;
}
static void *dmCrashReportThreadFp(void *param) { static void *dmCrashReportThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
@ -215,6 +236,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) {
return 0; return 0;
} }
int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) {
TdThreadAttr thAttr;
taosThreadAttrInit(&thAttr);
taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) {
dError("failed to create audit thread since %s", strerror(errno));
return -1;
}
taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-audit", "initialized");
return 0;
}
void dmStopMonitorThread(SDnodeMgmt *pMgmt) { void dmStopMonitorThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->monitorThread)) { if (taosCheckPthreadValid(pMgmt->monitorThread)) {
taosThreadJoin(pMgmt->monitorThread, NULL); taosThreadJoin(pMgmt->monitorThread, NULL);

View File

@ -124,6 +124,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg);
// dmMonitor.c // dmMonitor.c
void dmSendMonitorReport(); void dmSendMonitorReport();
void dmSendAuditRecords();
void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoads(SMonVloadInfo *pInfo);
void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo);
void dmGetMnodeLoads(SMonMloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo);

View File

@ -185,6 +185,7 @@ void dmCleanup() {
if (dmCheckRepeatCleanup(pDnode) != 0) return; if (dmCheckRepeatCleanup(pDnode) != 0) return;
dmCleanupDnode(pDnode); dmCleanupDnode(pDnode);
monCleanup(); monCleanup();
auditCleanup();
syncCleanUp(); syncCleanUp();
walCleanUp(); walCleanUp();
udfcClose(); udfcClose();
@ -390,6 +391,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) {
.processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq,
.processDropNodeFp = dmProcessDropNodeReq, .processDropNodeFp = dmProcessDropNodeReq,
.sendMonitorReportFp = dmSendMonitorReport, .sendMonitorReportFp = dmSendMonitorReport,
.sendAuditRecordFp = auditSendRecordsInBatch,
.getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsFp = dmGetVnodeLoads,
.getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite,
.getMnodeLoadsFp = dmGetMnodeLoads, .getMnodeLoadsFp = dmGetMnodeLoads,

View File

@ -16,6 +16,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "dmMgmt.h" #include "dmMgmt.h"
#include "dmNodes.h" #include "dmNodes.h"
#include "audit.h"
static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) {
pInfo->protocol = 1; pInfo->protocol = 1;
@ -108,6 +109,11 @@ void dmSendMonitorReport() {
monSendReport(); monSendReport();
} }
//Todo: put this in seperate file in the future
void dmSendAuditRecords() {
auditSendRecordsInBatch();
}
void dmGetVnodeLoads(SMonVloadInfo *pInfo) { void dmGetVnodeLoads(SMonVloadInfo *pInfo) {
SDnode *pDnode = dmInstance(); SDnode *pDnode = dmInstance();
SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE];

View File

@ -86,6 +86,7 @@ typedef enum {
typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg);
typedef void (*SendMonitorReportFp)(); typedef void (*SendMonitorReportFp)();
typedef void (*SendAuditRecordsFp)();
typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo);
typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo);
typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo);
@ -120,6 +121,7 @@ typedef struct {
ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp;
ProcessDropNodeFp processDropNodeFp; ProcessDropNodeFp processDropNodeFp;
SendMonitorReportFp sendMonitorReportFp; SendMonitorReportFp sendMonitorReportFp;
SendAuditRecordsFp sendAuditRecordFp;
GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsFp;
GetVnodeLoadsFp getVnodeLoadsLiteFp; GetVnodeLoadsFp getVnodeLoadsLiteFp;
GetMnodeLoadsFp getMnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp;

View File

@ -1009,11 +1009,15 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
taosMemoryFreeClear(*key); taosMemoryFreeClear(*key);
} }
taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey"));
size_t len = 0; size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len); char* keyJoined = taosStringBuilderGetResult(&sb, &len);
vInfo("create table %s", keyJoined);
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len);
} }
taosStringBuilderDestroy(&sb); taosStringBuilderDestroy(&sb);
@ -1237,7 +1241,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
char *keyJoined = taosStringBuilderGetResult(&sb, &len); char *keyJoined = taosStringBuilderGetResult(&sb, &len);
if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){
auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len);
} }
taosStringBuilderDestroy(&sb); taosStringBuilderDestroy(&sb);

View File

@ -17,9 +17,12 @@
#define _TD_AUDIT_INT_H_ #define _TD_AUDIT_INT_H_
#include "audit.h" #include "audit.h"
#include "tarray.h"
typedef struct { typedef struct {
SAuditCfg cfg; SAuditCfg cfg;
SArray *records;
TdThreadMutex lock;
} SAudit; } SAudit;
#endif /*_TD_AUDIT_INT_H_*/ #endif /*_TD_AUDIT_INT_H_*/

View File

@ -14,6 +14,8 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tarray.h"
#include "auditInt.h" #include "auditInt.h"
#include "taoserror.h" #include "taoserror.h"
#include "thttp.h" #include "thttp.h"
@ -21,25 +23,56 @@
#include "tjson.h" #include "tjson.h"
#include "tglobal.h" #include "tglobal.h"
#include "mnode.h" #include "mnode.h"
#include "audit.h"
SAudit tsAudit = {0}; SAudit tsAudit = {0};
char* tsAuditUri = "/audit"; char* tsAuditUri = "/audit";
char* tsAuditBatchUri = "/audit-batch";
int32_t auditInit(const SAuditCfg *pCfg) { int32_t auditInit(const SAuditCfg *pCfg) {
tsAudit.cfg = *pCfg; tsAudit.cfg = *pCfg;
tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *));
taosThreadMutexInit(&tsAudit.lock, NULL);
return 0; return 0;
} }
void auditCleanup() {
tsLogFp = NULL;
taosArrayDestroy(tsAudit.records);
tsAudit.records = NULL;
taosThreadMutexDestroy(&tsAudit.lock);
}
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len); char *detail, int32_t len);
extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
extern void auditSendRecordsInBatchImp();
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) { char *detail, int32_t len) {
auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len); auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
} }
void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
}
void auditSendRecordsInBatch(){
auditSendRecordsInBatchImp();
}
#ifndef TD_ENTERPRISE #ifndef TD_ENTERPRISE
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) { char *detail, int32_t len) {
} }
void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
}
void auditSendRecordsInBatchImp(){
}
#endif #endif