Merge pull request #22948 from taosdata/feat/TD-26127-audit-sql

Feat/td 26127 audit sql
This commit is contained in:
wade zhang 2023-09-20 14:06:28 +08:00 committed by GitHub
commit c415bbc5f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 546 additions and 270 deletions

View File

@ -767,6 +767,8 @@ typedef struct {
char* pAst2;
int64_t deleteMark1;
int64_t deleteMark2;
int32_t sqlLen;
char* sql;
} SMCreateStbReq;
int32_t tSerializeSMCreateStbReq(void* buf, int32_t bufLen, SMCreateStbReq* pReq);
@ -787,10 +789,13 @@ typedef struct {
int8_t source; // 1-taosX or 0-taosClient
int8_t reserved[6];
tb_uid_t suid;
int32_t sqlLen;
char* sql;
} SMDropStbReq;
int32_t tSerializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
int32_t tDeserializeSMDropStbReq(void* buf, int32_t bufLen, SMDropStbReq* pReq);
void tFreeSMDropStbReq(SMDropStbReq *pReq);
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
@ -800,6 +805,8 @@ typedef struct {
int32_t ttl;
int32_t commentLen;
char* comment;
int32_t sqlLen;
char* sql;
} SMAlterStbReq;
int32_t tSerializeSMAlterStbReq(void* buf, int32_t bufLen, SMAlterStbReq* pReq);
@ -869,10 +876,13 @@ int32_t tDeserializeSCreateAcctReq(void* buf, int32_t bufLen, SCreateAcctReq* pR
typedef struct {
char user[TSDB_USER_LEN];
int32_t sqlLen;
char* sql;
} SDropUserReq, SDropAcctReq;
int32_t tSerializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
int32_t tDeserializeSDropUserReq(void* buf, int32_t bufLen, SDropUserReq* pReq);
void tFreeSDropUserReq(SDropUserReq *pReq);
typedef struct {
int8_t createType;
@ -881,10 +891,13 @@ typedef struct {
int8_t enable;
char user[TSDB_USER_LEN];
char pass[TSDB_USET_PASSWORD_LEN];
int32_t sqlLen;
char* sql;
} SCreateUserReq;
int32_t tSerializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
int32_t tDeserializeSCreateUserReq(void* buf, int32_t bufLen, SCreateUserReq* pReq);
void tFreeSCreateUserReq(SCreateUserReq *pReq);
typedef struct {
int8_t alterType;
@ -897,6 +910,8 @@ typedef struct {
char tabName[TSDB_TABLE_NAME_LEN];
char* tagCond;
int32_t tagCondLen;
int32_t sqlLen;
char* sql;
} SAlterUserReq;
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
@ -1059,6 +1074,8 @@ typedef struct {
int16_t hashPrefix;
int16_t hashSuffix;
int32_t tsdbPageSize;
int32_t sqlLen;
char* sql;
} SCreateDbReq;
int32_t tSerializeSCreateDbReq(void* buf, int32_t bufLen, SCreateDbReq* pReq);
@ -1084,18 +1101,24 @@ typedef struct {
int32_t minRows;
int32_t walRetentionPeriod;
int32_t walRetentionSize;
int32_t sqlLen;
char* sql;
} SAlterDbReq;
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
int32_t tDeserializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
void tFreeSAlterDbReq(SAlterDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
int8_t ignoreNotExists;
int32_t sqlLen;
char* sql;
} SDropDbReq;
int32_t tSerializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
int32_t tDeserializeSDropDbReq(void* buf, int32_t bufLen, SDropDbReq* pReq);
void tFreeSDropDbReq(SDropDbReq* pReq);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
@ -1289,10 +1312,13 @@ void tFreeSUserAuthBatchRsp(SUserAuthBatchRsp* pRsp);
typedef struct {
char db[TSDB_DB_FNAME_LEN];
STimeWindow timeRange;
int32_t sqlLen;
char* sql;
} SCompactDbReq;
int32_t tSerializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
int32_t tDeserializeSCompactDbReq(void* buf, int32_t bufLen, SCompactDbReq* pReq);
void tFreeSCompactDbReq(SCompactDbReq *pReq);
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
@ -1852,10 +1878,13 @@ void tFreeSExplainRsp(SExplainRsp* pRsp);
typedef struct {
char fqdn[TSDB_FQDN_LEN]; // end point, hostname:port
int32_t port;
int32_t sqlLen;
char* sql;
} SCreateDnodeReq;
int32_t tSerializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
int32_t tDeserializeSCreateDnodeReq(void* buf, int32_t bufLen, SCreateDnodeReq* pReq);
void tFreeSCreateDnodeReq(SCreateDnodeReq* pReq);
typedef struct {
int32_t dnodeId;
@ -1863,10 +1892,13 @@ typedef struct {
int32_t port;
int8_t force;
int8_t unsafe;
int32_t sqlLen;
char* sql;
} SDropDnodeReq;
int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq);
void tFreeSDropDnodeReq(SDropDnodeReq* pReq);
enum {
RESTORE_TYPE__ALL = 1,
@ -1878,19 +1910,25 @@ enum {
typedef struct {
int32_t dnodeId;
int8_t restoreType;
int32_t sqlLen;
char* sql;
} SRestoreDnodeReq;
int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
int32_t tDeserializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq);
void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq);
typedef struct {
int32_t dnodeId;
char config[TSDB_DNODE_CONFIG_LEN];
char value[TSDB_DNODE_VALUE_LEN];
int32_t sqlLen;
char* sql;
} SMCfgDnodeReq;
int32_t tSerializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
int32_t tDeserializeSMCfgDnodeReq(void* buf, int32_t bufLen, SMCfgDnodeReq* pReq);
void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq);
typedef struct {
char config[TSDB_DNODE_CONFIG_LEN];
@ -1902,12 +1940,15 @@ int32_t tDeserializeSDCfgDnodeReq(void* buf, int32_t bufLen, SDCfgDnodeReq* pReq
typedef struct {
int32_t dnodeId;
int32_t sqlLen;
char* sql;
} SMCreateMnodeReq, SMDropMnodeReq, SDDropMnodeReq, SMCreateQnodeReq, SMDropQnodeReq, SDCreateQnodeReq, SDDropQnodeReq,
SMCreateSnodeReq, SMDropSnodeReq, SDCreateSnodeReq, SDDropSnodeReq;
int32_t tSerializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
int32_t tDeserializeSCreateDropMQSNodeReq(void* buf, int32_t bufLen, SMCreateQnodeReq* pReq);
void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq);
void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq);
typedef struct {
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
@ -1942,10 +1983,13 @@ int32_t tDeserializeSKillTransReq(void* buf, int32_t bufLen, SKillTransReq* pReq
typedef struct {
int32_t useless; // useless
int32_t sqlLen;
char* sql;
} SBalanceVgroupReq;
int32_t tSerializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
int32_t tDeserializeSBalanceVgroupReq(void* buf, int32_t bufLen, SBalanceVgroupReq* pReq);
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq);
typedef struct {
int32_t vgId1;
@ -1960,17 +2004,23 @@ typedef struct {
int32_t dnodeId1;
int32_t dnodeId2;
int32_t dnodeId3;
int32_t sqlLen;
char* sql;
} SRedistributeVgroupReq;
int32_t tSerializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
int32_t tDeserializeSRedistributeVgroupReq(void* buf, int32_t bufLen, SRedistributeVgroupReq* pReq);
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq);
typedef struct {
int32_t useless;
int32_t sqlLen;
char* sql;
} SBalanceVgroupLeaderReq;
int32_t tSerializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
int32_t tDeserializeSBalanceVgroupLeaderReq(void* buf, int32_t bufLen, SBalanceVgroupLeaderReq* pReq);
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq);
typedef struct {
int32_t vgId;
@ -2446,10 +2496,13 @@ typedef struct {
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
int8_t igNotExists;
int32_t sqlLen;
char* sql;
} SMDropTopicReq;
int32_t tSerializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
int32_t tDeserializeSMDropTopicReq(void* buf, int32_t bufLen, SMDropTopicReq* pReq);
void tFreeSMDropTopicReq(SMDropTopicReq *pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];
@ -2545,6 +2598,8 @@ typedef struct SVCreateTbReq {
SSchemaWrapper schemaRow;
} ntb;
};
int32_t sqlLen;
char* sql;
} SVCreateTbReq;
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
@ -2555,7 +2610,8 @@ static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
if (NULL == req) {
return;
}
taosMemoryFreeClear(req->sql);
taosMemoryFreeClear(req->name);
taosMemoryFreeClear(req->comment);
if (req->type == TSDB_CHILD_TABLE) {
@ -3019,6 +3075,8 @@ typedef struct {
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
int32_t sqlLen;
char* sql;
} SMDropStreamReq;
typedef struct {
@ -3038,6 +3096,7 @@ typedef struct {
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeSMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -29,7 +29,7 @@
extern "C" {
#endif
#define AUDIT_DETAIL_MAX 16000
#define AUDIT_DETAIL_MAX 64000
typedef struct {
const char *server;
@ -39,7 +39,8 @@ typedef struct {
int32_t auditInit(const SAuditCfg *pCfg);
void auditSend(SJson *pJson);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
#ifdef __cplusplus
}

View File

@ -377,6 +377,7 @@ _exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
taosMemoryFreeClear(pCreateReq->sql);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}

View File

@ -30,6 +30,32 @@
#include "tlog.h"
#define DECODESQL() \
do { \
if(!tDecodeIsEnd(&decoder)){ \
if(tDecodeI32(&decoder, &pReq->sqlLen) < 0) return -1; \
if(pReq->sqlLen > 0){ \
} \
} \
} while (0)
// if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->sql, NULL) < 0) return -1; \
#define ENCODESQL() \
do { \
if (pReq->sqlLen > 0 && pReq->sql != NULL){ \
if (tEncodeI32(&encoder, pReq->sqlLen) < 0) return -1; \
if (tEncodeBinary(&encoder, pReq->sql, pReq->sqlLen) < 0) return -1; \
} \
} while (0)
#define FREESQL() \
do { \
if(pReq->sql != NULL){ \
taosMemoryFree(pReq->sql); \
} \
pReq->sql = NULL; \
} while (0)
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
@ -561,6 +587,8 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq
if (tEncodeI64(&encoder, pReq->deleteMark1) < 0) return -1;
if (tEncodeI64(&encoder, pReq->deleteMark2) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -656,6 +684,8 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR
if (tDecodeI64(&decoder, &pReq->deleteMark1) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->deleteMark2) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -668,6 +698,7 @@ void tFreeSMCreateStbReq(SMCreateStbReq *pReq) {
taosMemoryFreeClear(pReq->pComment);
taosMemoryFreeClear(pReq->pAst1);
taosMemoryFreeClear(pReq->pAst2);
FREESQL();
}
int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
@ -682,6 +713,7 @@ int32_t tSerializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq) {
if (tEncodeI8(&encoder, pReq->reserved[i]) < 0) return -1;
}
if (tEncodeI64(&encoder, pReq->suid) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -702,12 +734,18 @@ int32_t tDeserializeSMDropStbReq(void *buf, int32_t bufLen, SMDropStbReq *pReq)
}
if (tDecodeI64(&decoder, &pReq->suid) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMDropStbReq(SMDropStbReq *pReq) {
FREESQL();
}
int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -727,6 +765,7 @@ int32_t tSerializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq)
if (pReq->commentLen > 0) {
if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1;
}
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -767,6 +806,8 @@ int32_t tDeserializeSMAlterStbReq(void *buf, int32_t bufLen, SMAlterStbReq *pReq
if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1;
}
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
@ -776,6 +817,7 @@ void tFreeSMAltertbReq(SMAlterStbReq *pReq) {
taosArrayDestroy(pReq->pFields);
pReq->pFields = NULL;
taosMemoryFreeClear(pReq->comment);
FREESQL();
}
int32_t tSerializeSEpSet(void *buf, int32_t bufLen, const SEpSet *pEpset) {
@ -1350,6 +1392,7 @@ int32_t tSerializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1363,12 +1406,17 @@ int32_t tDeserializeSDropUserReq(void *buf, int32_t bufLen, SDropUserReq *pReq)
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDropUserReq(SDropUserReq *pReq) {
FREESQL();
}
int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1380,6 +1428,7 @@ int32_t tSerializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pReq
if (tEncodeI8(&encoder, pReq->enable) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->user) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->pass) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1398,12 +1447,17 @@ int32_t tDeserializeSCreateUserReq(void *buf, int32_t bufLen, SCreateUserReq *pR
if (tDecodeI8(&decoder, &pReq->enable) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->user) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->pass) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCreateUserReq(SCreateUserReq *pReq) {
FREESQL();
}
int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1422,6 +1476,7 @@ int32_t tSerializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq)
if (tEncodeCStr(&encoder, pReq->tabName) < 0) return -1;
}
if (tEncodeBinary(&encoder, pReq->tagCond, pReq->tagCondLen) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1451,13 +1506,17 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->tagCond, &tagCondLen) < 0) return -1;
pReq->tagCondLen = tagCondLen;
}
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSAlterUserReq(SAlterUserReq *pReq) { taosMemoryFreeClear(pReq->tagCond); }
void tFreeSAlterUserReq(SAlterUserReq *pReq) {
taosMemoryFreeClear(pReq->tagCond);
FREESQL();
}
int32_t tSerializeSGetUserAuthReq(void *buf, int32_t bufLen, SGetUserAuthReq *pReq) {
SEncoder encoder = {0};
@ -1744,6 +1803,7 @@ int32_t tSerializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQnode
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1757,12 +1817,21 @@ int32_t tDeserializeSCreateDropMQSNodeReq(void *buf, int32_t bufLen, SMCreateQno
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCreateQnodeReq(SMCreateQnodeReq *pReq){
FREESQL();
}
void tFreeSDDropQnodeReq(SDDropQnodeReq* pReq) {
FREESQL();
}
int32_t tSerializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1773,6 +1842,7 @@ int32_t tSerializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq)
if (tEncodeI32(&encoder, pReq->port) < 0) return -1;
if (tEncodeI8(&encoder, pReq->force) < 0) return -1;
if (tEncodeI8(&encoder, pReq->unsafe) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1795,12 +1865,17 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
pReq->unsafe = false;
}
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDropDnodeReq(SDropDnodeReq *pReq) {
FREESQL();
}
int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1808,6 +1883,7 @@ int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeI8(&encoder, pReq->restoreType) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1822,12 +1898,17 @@ int32_t tDeserializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->restoreType) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSRestoreDnodeReq(SRestoreDnodeReq *pReq) {
FREESQL();
}
int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1836,6 +1917,7 @@ int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq)
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->config) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->value) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1851,12 +1933,17 @@ int32_t tDeserializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->config) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->value) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMCfgDnodeReq(SMCfgDnodeReq *pReq) {
FREESQL();
}
int32_t tSerializeSDCfgDnodeReq(void *buf, int32_t bufLen, SDCfgDnodeReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -1891,6 +1978,7 @@ int32_t tSerializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *pR
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->fqdn) < 0) return -1;
if (tEncodeI32(&encoder, pReq->port) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -1905,12 +1993,17 @@ int32_t tDeserializeSCreateDnodeReq(void *buf, int32_t bufLen, SCreateDnodeReq *
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->fqdn) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->port) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCreateDnodeReq(SCreateDnodeReq *pReq) {
FREESQL();
}
int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2397,6 +2490,7 @@ int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
if (tEncodeI8(&encoder, pRetension->keepUnit) < 0) return -1;
}
if (tEncodeI32(&encoder, pReq->tsdbPageSize) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -2459,6 +2553,8 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
if (tDecodeI32(&decoder, &pReq->tsdbPageSize) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -2468,6 +2564,7 @@ int32_t tDeserializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq)
void tFreeSCreateDbReq(SCreateDbReq *pReq) {
taosArrayDestroy(pReq->pRetensions);
pReq->pRetensions = NULL;
FREESQL();
}
int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
@ -2496,6 +2593,7 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
// 2nd modification
if (tEncodeI32(&encoder, pReq->walRetentionPeriod) < 0) return -1;
if (tEncodeI32(&encoder, pReq->walRetentionSize) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -2539,12 +2637,17 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
pReq->walRetentionPeriod = -1;
pReq->walRetentionSize = -1;
}
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSAlterDbReq(SAlterDbReq *pReq) {
FREESQL();
}
int32_t tSerializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2552,6 +2655,7 @@ int32_t tSerializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) {
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI8(&encoder, pReq->ignoreNotExists) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -2566,12 +2670,17 @@ int32_t tDeserializeSDropDbReq(void *buf, int32_t bufLen, SDropDbReq *pReq) {
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->ignoreNotExists) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSDropDbReq(SDropDbReq *pReq) {
FREESQL();
}
int32_t tSerializeSDropDbRsp(void *buf, int32_t bufLen, SDropDbRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -2826,6 +2935,7 @@ int32_t tSerializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq)
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeRange.skey) < 0) return -1;
if (tEncodeI64(&encoder, pReq->timeRange.ekey) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -2841,12 +2951,17 @@ int32_t tDeserializeSCompactDbReq(void *buf, int32_t bufLen, SCompactDbReq *pReq
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeRange.skey) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->timeRange.ekey) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSCompactDbReq(SCompactDbReq *pReq) {
FREESQL();
}
int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->db) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1;
@ -3991,6 +4106,7 @@ int32_t tSerializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pReq
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4005,12 +4121,17 @@ int32_t tDeserializeSMDropTopicReq(void *buf, int32_t bufLen, SMDropTopicReq *pR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMDropTopicReq(SMDropTopicReq *pReq) {
FREESQL();
}
int32_t tSerializeSMDropCgroupReq(void *buf, int32_t bufLen, SMDropCgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -4884,6 +5005,7 @@ int32_t tSerializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupReq
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4897,18 +5019,24 @@ int32_t tDeserializeSBalanceVgroupReq(void *buf, int32_t bufLen, SBalanceVgroupR
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSBalanceVgroupReq(SBalanceVgroupReq *pReq) {
FREESQL();
}
int32_t tSerializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceVgroupLeaderReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->useless) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4922,12 +5050,17 @@ int32_t tDeserializeSBalanceVgroupLeaderReq(void *buf, int32_t bufLen, SBalanceV
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->useless) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSBalanceVgroupLeaderReq(SBalanceVgroupLeaderReq *pReq) {
FREESQL();
}
int32_t tSerializeSMergeVgroupReq(void *buf, int32_t bufLen, SMergeVgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -4964,6 +5097,7 @@ int32_t tSerializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistribut
if (tEncodeI32(&encoder, pReq->dnodeId1) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId2) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId3) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -4980,12 +5114,17 @@ int32_t tDeserializeSRedistributeVgroupReq(void *buf, int32_t bufLen, SRedistrib
if (tDecodeI32(&decoder, &pReq->dnodeId1) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId2) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId3) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSRedistributeVgroupReq(SRedistributeVgroupReq *pReq) {
FREESQL();
}
int32_t tSerializeSSplitVgroupReq(void *buf, int32_t bufLen, SSplitVgroupReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -6539,6 +6678,8 @@ int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamR
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
ENCODESQL();
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -6554,12 +6695,18 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
DECODESQL();
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSMDropStreamReq(SMDropStreamReq *pReq) {
FREESQL();
}
int32_t tSerializeSMRecoverStreamReq(void *buf, int32_t bufLen, const SMRecoverStreamReq *pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
@ -6701,6 +6848,11 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
} else {
ASSERT(0);
}
//ENCODESQL
if(pReq->sqlLen > 0 && pReq->sql != NULL) {
if (tEncodeI32(pCoder, pReq->sqlLen) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->sql, pReq->sqlLen) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
@ -6744,6 +6896,14 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
ASSERT(0);
}
//DECODESQL
if(!tDecodeIsEnd(pCoder)){
if(tDecodeI32(pCoder, &pReq->sqlLen) < 0) return -1;
if(pReq->sqlLen > 0){
if (tDecodeBinaryAlloc(pCoder, (void**)&pReq->sql, NULL) < 0) return -1;
}
}
tEndDecode(pCoder);
return 0;
}
@ -6765,6 +6925,11 @@ void tDestroySVCreateTbReq(SVCreateTbReq *pReq, int32_t flags) {
if (pReq->ntb.schemaRow.pSchema) taosMemoryFree(pReq->ntb.schemaRow.pSchema);
}
}
if(pReq->sql != NULL){
taosMemoryFree(pReq->sql);
}
pReq->sql = NULL;
}
int tEncodeSVCreateTbBatchReq(SEncoder *pCoder, const SVCreateTbBatchReq *pReq) {

View File

@ -80,15 +80,18 @@ int32_t mmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dGError("failed to drop mnode since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
SMnodeOpt option = {.deploy = false};
if (mmWriteFile(pInput->path, &option) != 0) {
dGError("failed to write mnode file since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
tFreeSMCreateQnodeReq(&dropReq);
return 0;
}

View File

@ -39,15 +39,18 @@ int32_t qmProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create qnode since %s", terrstr());
tFreeSMCreateQnodeReq(&createReq);
return -1;
}
bool deployed = true;
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
tFreeSMCreateQnodeReq(&createReq);
return -1;
}
tFreeSMCreateQnodeReq(&createReq);
return 0;
}
@ -61,15 +64,18 @@ int32_t qmProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop qnode since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
bool deployed = false;
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write qnode file since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
tFreeSMCreateQnodeReq(&dropReq);
return 0;
}

View File

@ -28,15 +28,18 @@ int32_t smProcessCreateReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && createReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to create snode since %s", terrstr());
tFreeSMCreateQnodeReq(&createReq);
return -1;
}
bool deployed = true;
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
tFreeSMCreateQnodeReq(&createReq);
return -1;
}
tFreeSMCreateQnodeReq(&createReq);
return 0;
}
@ -50,15 +53,18 @@ int32_t smProcessDropReq(const SMgmtInputOpt *pInput, SRpcMsg *pMsg) {
if (pInput->pData->dnodeId != 0 && dropReq.dnodeId != pInput->pData->dnodeId) {
terrno = TSDB_CODE_INVALID_OPTION;
dError("failed to drop snode since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
bool deployed = false;
if (dmWriteFile(pInput->path, pInput->name, deployed) != 0) {
dError("failed to write snode file since %s", terrstr());
tFreeSMCreateQnodeReq(&dropReq);
return -1;
}
tFreeSMCreateQnodeReq(&dropReq);
return 0;
}

View File

@ -751,45 +751,11 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
code = mndCreateDb(pMnode, pReq, &createReq, pUser);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[3000] = {0};
char tmp[100] = {0};
mndBuildAuditDetailInt32(detail, tmp, "buffer:%d", createReq.buffer);
mndBuildAuditDetailInt32(detail, tmp, "cacheLast:%d", createReq.cacheLast);
mndBuildAuditDetailInt32(detail, tmp, "cacheLastSize:%d", createReq.cacheLastSize);
mndBuildAuditDetailInt32(detail, tmp, "compression:%d", createReq.compression);
mndBuildAuditDetailInt32(detail, tmp, "daysPerFile:%d", createReq.daysPerFile);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep0:%d", createReq.daysToKeep0);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep1:%d", createReq.daysToKeep1);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep2:%d", createReq.daysToKeep2);
mndBuildAuditDetailInt32(detail, tmp, "hashPrefix:%d", createReq.hashPrefix);
mndBuildAuditDetailInt32(detail, tmp, "hashSuffix:%d", createReq.hashSuffix);
mndBuildAuditDetailInt32(detail, tmp, "ignoreExist:%d", createReq.ignoreExist);
mndBuildAuditDetailInt32(detail, tmp, "maxRows:%d", createReq.maxRows);
mndBuildAuditDetailInt32(detail, tmp, "minRows:%d", createReq.minRows);
mndBuildAuditDetailInt32(detail, tmp, "numOfRetensions:%d", createReq.numOfRetensions);
mndBuildAuditDetailInt32(detail, tmp, "numOfStables:%d", createReq.numOfStables);
mndBuildAuditDetailInt32(detail, tmp, "numOfVgroups:%d", createReq.numOfVgroups);
mndBuildAuditDetailInt32(detail, tmp, "pages:%d", createReq.pages);
mndBuildAuditDetailInt32(detail, tmp, "pageSize:%d", createReq.pageSize);
mndBuildAuditDetailInt32(detail, tmp, "precision:%d", createReq.precision);
mndBuildAuditDetailInt32(detail, tmp, "replications:%d", createReq.replications);
mndBuildAuditDetailInt32(detail, tmp, "schemaless:%d", createReq.schemaless);
mndBuildAuditDetailInt32(detail, tmp, "sstTrigger:%d", createReq.sstTrigger);
mndBuildAuditDetailInt32(detail, tmp, "strict:%d", createReq.strict);
mndBuildAuditDetailInt32(detail, tmp, "tsdbPageSize:%d", createReq.tsdbPageSize);
mndBuildAuditDetailInt32(detail, tmp, "walFsyncPeriod:%d", createReq.walFsyncPeriod);
mndBuildAuditDetailInt32(detail, tmp, "walLevel:%d", createReq.walLevel);
mndBuildAuditDetailInt32(detail, tmp, "walRetentionPeriod:%d", createReq.walRetentionPeriod);
mndBuildAuditDetailInt32(detail, tmp, "walRetentionSize:%" PRId64, createReq.walRetentionSize);
mndBuildAuditDetailInt32(detail, tmp, "walRollPeriod:%d", createReq.walRollPeriod);
mndBuildAuditDetailInt32(detail, tmp, "walSegmentSize:%" PRId64, createReq.walSegmentSize);
SName name = {0};
tNameFromString(&name, createReq.db, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "createDB", name.dbname, "", detail);
auditRecord(pReq, pMnode->clusterId, "createDB", name.dbname, "", createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1033,29 +999,10 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
}
char detail[3000] = {0};
char tmp[100] = {0};
mndBuildAuditDetailInt32(detail, tmp, "buffer:%d", alterReq.buffer);
mndBuildAuditDetailInt32(detail, tmp, "cacheLast:%d", alterReq.cacheLast);
mndBuildAuditDetailInt32(detail, tmp, "cacheLastSize:%d", alterReq.cacheLastSize);
mndBuildAuditDetailInt32(detail, tmp, "daysPerFile:%d", alterReq.daysPerFile);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep0:%d", alterReq.daysToKeep0);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep1:%d", alterReq.daysToKeep1);
mndBuildAuditDetailInt32(detail, tmp, "daysToKeep2:%d", alterReq.daysToKeep2);
mndBuildAuditDetailInt32(detail, tmp, "minRows:%d", alterReq.minRows);
mndBuildAuditDetailInt32(detail, tmp, "pages:%d", alterReq.pages);
mndBuildAuditDetailInt32(detail, tmp, "pageSize:%d", alterReq.pageSize);
mndBuildAuditDetailInt32(detail, tmp, "replications:%d", alterReq.replications);
mndBuildAuditDetailInt32(detail, tmp, "sstTrigger:%d", alterReq.sstTrigger);
mndBuildAuditDetailInt32(detail, tmp, "strict:%d", alterReq.strict);
mndBuildAuditDetailInt32(detail, tmp, "walFsyncPeriod:%d", alterReq.walFsyncPeriod);
mndBuildAuditDetailInt32(detail, tmp, "walRetentionSize:%d", alterReq.walRetentionSize);
SName name = {0};
tNameFromString(&name, alterReq.db, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "alterDB", name.dbname, "", detail);
auditRecord(pReq, pMnode->clusterId, "alterDB", name.dbname, "", alterReq.sql, alterReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1065,6 +1012,7 @@ _OVER:
mndReleaseDb(pMnode, pDb);
taosArrayDestroy(dbObj.cfg.pRetensions);
tFreeSAlterDbReq(&alterReq);
terrno = code;
return code;
@ -1346,13 +1294,10 @@ static int32_t mndProcessDropDbReq(SRpcMsg *pReq) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
char detail[1000] = {0};
sprintf(detail, "ignoreNotExists:%d", dropReq.ignoreNotExists);
SName name = {0};
tNameFromString(&name, dropReq.db, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "dropDB", name.dbname, "", detail);
auditRecord(pReq, pMnode->clusterId, "dropDB", name.dbname, "", dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1360,6 +1305,7 @@ _OVER:
}
mndReleaseDb(pMnode, pDb);
tFreeSDropDbReq(&dropReq);
return code;
}

View File

@ -913,7 +913,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
char obj[200] = {0};
sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
auditRecord(pReq, pMnode->clusterId, "createDnode", obj, "", "");
auditRecord(pReq, pMnode->clusterId, "createDnode", obj, "", createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -921,6 +921,7 @@ _OVER:
}
mndReleaseDnode(pMnode, pDnode);
tFreeSCreateDnodeReq(&createReq);
return code;
}
@ -1065,13 +1066,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
char obj1[30] = {0};
sprintf(obj1, "%d", dropReq.dnodeId);
//char obj2[150] = {0};
//sprintf(obj2, "%s:%d", dropReq.fqdn, dropReq.port);
char detail[100] = {0};
sprintf(detail, "force:%d, unsafe:%d", dropReq.force, dropReq.unsafe);
auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, "", detail);
auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, "", dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1082,6 +1077,7 @@ _OVER:
mndReleaseMnode(pMnode, pMObj);
mndReleaseQnode(pMnode, pQObj);
mndReleaseSnode(pMnode, pSObj);
tFreeSDropDnodeReq(&dropReq);
return code;
}
@ -1102,6 +1098,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
mInfo("dnode:%d, start to config, option:%s, value:%s", cfgReq.dnodeId, cfgReq.config, cfgReq.value);
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_DNODE) != 0) {
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1112,6 +1109,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (' ' != cfgReq.config[7] && 0 != cfgReq.config[7]) {
mError("dnode:%d, failed to config monitor since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1123,6 +1121,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (flag < 0 || flag > 2) {
mError("dnode:%d, failed to config monitor since value:%d", cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1137,6 +1136,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (flag < 0 || flag > 23) {
mError("dnode:%d, failed to config keepTimeOffset since value:%d. Valid range: [0, 23]", cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1152,6 +1152,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
mError("dnode:%d, failed to config ttlPushInterval since value:%d. Valid range: [0, 100000]", cfgReq.dnodeId,
flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1167,6 +1168,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
mError("dnode:%d, failed to config ttlBatchDropNum since value:%d. Valid range: [0, %d]", cfgReq.dnodeId,
flag, INT32_MAX);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1182,6 +1184,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (flag < 0 || flag > 4096) {
mError("dnode:%d, failed to config supportVnodes since value:%d. Valid range: [0, 4096]", cfgReq.dnodeId, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
if (flag == 0) {
@ -1197,6 +1200,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (' ' != cfgReq.config[index] && 0 != cfgReq.config[index]) {
mError("dnode:%d, failed to config activeCode since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
int32_t vlen = strlen(cfgReq.value);
@ -1206,6 +1210,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
mError("dnode:%d, failed to config activeCode since invalid vlen:%d. conf:%s, val:%s", cfgReq.dnodeId, vlen,
cfgReq.config, cfgReq.value);
terrno = TSDB_CODE_INVALID_OPTION;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1214,8 +1219,10 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (mndConfigDnode(pMnode, pReq, &cfgReq, opt) != 0) {
mError("dnode:%d, failed to config activeCode since %s", cfgReq.dnodeId, terrstr());
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
tFreeSMCfgDnodeReq(&cfgReq);
return 0;
#endif
} else {
@ -1228,6 +1235,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (' ' != cfgReq.config[optLen] && 0 != cfgReq.config[optLen]) {
mError("dnode:%d, failed to config since invalid conf:%s", cfgReq.dnodeId, cfgReq.config);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1239,6 +1247,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (flag < 0 || flag > 255) {
mError("dnode:%d, failed to config %s since value:%d", cfgReq.dnodeId, optName, flag);
terrno = TSDB_CODE_INVALID_CFG;
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
@ -1250,6 +1259,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
if (!findOpt) {
terrno = TSDB_CODE_INVALID_CFG;
mError("dnode:%d, failed to config since %s", cfgReq.dnodeId, terrstr());
tFreeSMCfgDnodeReq(&cfgReq);
return -1;
}
}
@ -1257,10 +1267,9 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
char obj[50] = {0};
sprintf(obj, "%d", cfgReq.dnodeId);
char detail[500] = {0};
sprintf(detail, "config:%s, value:%s", cfgReq.config, cfgReq.value);
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", detail);
tFreeSMCfgDnodeReq(&cfgReq);
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;

View File

@ -656,7 +656,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
char obj[40] = {0};
sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createMnode", obj, "", "");
auditRecord(pReq, pMnode->clusterId, "createMnode", obj, "", createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -665,6 +665,7 @@ _OVER:
mndReleaseMnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
tFreeSMCreateQnodeReq(&createReq);
return code;
}
@ -797,7 +798,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
char obj[40] = {0};
sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropMnode", obj, "", "");
auditRecord(pReq, pMnode->clusterId, "dropMnode", obj, "", dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -805,6 +806,7 @@ _OVER:
}
mndReleaseMnode(pMnode, pObj);
tFreeSMCreateQnodeReq(&dropReq);
return code;
}

View File

@ -316,7 +316,7 @@ _CONNECT:
sprintf(detail, "connType:%d, db:%s, pid:%d, startTime:%" PRId64 ", sVer:%s, app:%s",
connReq.connType, connReq.db, connReq.pid, connReq.startTime, connReq.sVer, connReq.app);
auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail);
auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail, strlen(detail));
_OVER:

View File

@ -310,7 +310,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createQnode", obj, "", "");
auditRecord(pReq, pMnode->clusterId, "createQnode", obj, "", createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
@ -318,6 +318,7 @@ _OVER:
mndReleaseQnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
tFreeSMCreateQnodeReq(&createReq);
return code;
}
@ -423,7 +424,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropQnode", obj, "", "");
auditRecord(pReq, pMnode->clusterId, "dropQnode", obj, "", dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -431,6 +432,7 @@ _OVER:
}
mndReleaseQnode(pMnode, pObj);
tFreeSMCreateQnodeReq(&dropReq);
return code;
}

View File

@ -316,6 +316,7 @@ _OVER:
mndReleaseSnode(pMnode, pObj);
mndReleaseDnode(pMnode, pDnode);
tFreeSMCreateQnodeReq(&createReq);
return code;
}
@ -425,6 +426,7 @@ _OVER:
}
mndReleaseSnode(pMnode, pObj);
tFreeSMCreateQnodeReq(&dropReq);
return code;
}

View File

@ -1081,80 +1081,6 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq
return TSDB_CODE_SUCCESS;
}
static char *mndAuditFieldTypeStr(int32_t type) {
switch (type) {
case TSDB_DATA_TYPE_NULL:
return "null";
case TSDB_DATA_TYPE_BOOL:
return "bool";
case TSDB_DATA_TYPE_TINYINT:
return "tinyint";
case TSDB_DATA_TYPE_SMALLINT:
return "smallint";
case TSDB_DATA_TYPE_INT:
return "int";
case TSDB_DATA_TYPE_BIGINT:
return "bigint";
case TSDB_DATA_TYPE_FLOAT:
return "float";
case TSDB_DATA_TYPE_DOUBLE:
return "double";
case TSDB_DATA_TYPE_VARCHAR:
return "varchar";
case TSDB_DATA_TYPE_TIMESTAMP:
return "timestamp";
case TSDB_DATA_TYPE_NCHAR:
return "nchar";
case TSDB_DATA_TYPE_UTINYINT:
return "utinyint";
case TSDB_DATA_TYPE_USMALLINT:
return "usmallint";
case TSDB_DATA_TYPE_UINT:
return "uint";
case TSDB_DATA_TYPE_UBIGINT:
return "ubigint";
case TSDB_DATA_TYPE_JSON:
return "json";
case TSDB_DATA_TYPE_VARBINARY:
return "varbinary";
case TSDB_DATA_TYPE_DECIMAL:
return "decimal";
case TSDB_DATA_TYPE_BLOB:
return "blob";
case TSDB_DATA_TYPE_MEDIUMBLOB:
return "mediumblob";
case TSDB_DATA_TYPE_GEOMETRY:
return "geometry";
default:
return "error";
}
}
static void mndAuditFieldStr(char *detail, SArray *arr, int32_t len, int32_t max) {
int32_t detialLen = strlen(detail);
int32_t fieldLen = 0;
for (int32_t i = 0; i < len; ++i) {
SField *pField = taosArrayGet(arr, i);
char field[TSDB_COL_NAME_LEN + 20] = {0};
fieldLen = strlen(", ");
if (detialLen > 0 && detialLen < max - fieldLen - 1) {
strcat(detail, ", ");
detialLen += fieldLen;
} else {
break;
}
sprintf(field, "%s:%s", pField->name, mndAuditFieldTypeStr(pField->type));
fieldLen = strlen(field);
if (detialLen < max - fieldLen - 1) {
strcat(detail, field);
detialLen += fieldLen;
} else {
break;
}
}
}
static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
int32_t code = -1;
@ -1263,26 +1189,10 @@ static int32_t mndProcessCreateStbReq(SRpcMsg *pReq) {
}
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[AUDIT_DETAIL_MAX] = {0};
sprintf(detail,
"colVer:%d, delay1:%" PRId64 ", delay2:%" PRId64 ", deleteMark1:%" PRId64
", "
"deleteMark2:%" PRId64
", igExists:%d, numOfColumns:%d, numOfFuncs:%d, numOfTags:%d, "
"source:%d, suid:%" PRId64
", tagVer:%d, ttl:%d, "
"watermark1:%" PRId64 ", watermark2:%" PRId64,
createReq.colVer, createReq.delay1, createReq.delay2, createReq.deleteMark1, createReq.deleteMark2,
createReq.igExists, createReq.numOfColumns, createReq.numOfFuncs, createReq.numOfTags, createReq.source,
createReq.suid, createReq.tagVer, createReq.ttl, createReq.watermark1, createReq.watermark2);
mndAuditFieldStr(detail, createReq.pColumns, createReq.numOfColumns, AUDIT_DETAIL_MAX);
mndAuditFieldStr(detail, createReq.pTags, createReq.numOfTags, AUDIT_DETAIL_MAX);
SName name = {0};
tNameFromString(&name, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, detail);
auditRecord(pReq, pMnode->clusterId, "createStb", name.dbname, name.tname, createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -2352,13 +2262,10 @@ static int32_t mndProcessAlterStbReq(SRpcMsg *pReq) {
code = mndAlterStb(pMnode, pReq, &alterReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail, "alterType:%d, numOfFields:%d, ttl:%d", alterReq.alterType, alterReq.numOfFields, alterReq.ttl);
SName name = {0};
tNameFromString(&name, alterReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, name.tname, detail);
auditRecord(pReq, pMnode->clusterId, "alterStb", name.dbname, alterReq.name, alterReq.sql, alterReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -2622,13 +2529,10 @@ static int32_t mndProcessDropStbReq(SRpcMsg *pReq) {
code = mndDropStb(pMnode, pReq, pDb, pStb);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail, "igNotExists:%d, source:%d", dropReq.igNotExists, dropReq.source);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, detail);
auditRecord(pReq, pMnode->clusterId, "dropStb", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -2637,6 +2541,7 @@ _OVER:
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
tFreeSMDropStbReq(&dropReq);
return code;
}

View File

@ -863,22 +863,15 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[2000] = {0};
sprintf(detail,
"checkpointFreq:%" PRId64 ", createStb:%d, deleteMark:%" PRId64
", fillHistory:%d, igExists:%d, igExpired:%d, igUpdate:%d, lastTs:%" PRId64 ", maxDelay:%" PRId64
", numOfTags:%d, sourceDB:%s, targetStbFullName:%s, triggerType:%d, watermark:%" PRId64,
createStreamReq.checkpointFreq, createStreamReq.createStb, createStreamReq.deleteMark,
createStreamReq.fillHistory, createStreamReq.igExists, createStreamReq.igExpired, createStreamReq.igUpdate,
createStreamReq.lastTs, createStreamReq.maxDelay, createStreamReq.numOfTags, createStreamReq.sourceDB,
createStreamReq.targetStbFullName, createStreamReq.triggerType, createStreamReq.watermark);
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, "", detail);
//TODO
if (createStreamReq.sql != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, "",
createStreamReq.sql, strlen(createStreamReq.sql));
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
@ -1261,15 +1254,18 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (dropReq.igNotExists) {
mInfo("stream:%s, not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
tFreeSMDropStreamReq(&dropReq);
return -1;
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1277,6 +1273,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1286,6 +1283,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1294,6 +1292,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1301,6 +1300,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
@ -1308,20 +1308,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
return -1;
}
removeStreamTasksInBuf(pStream, &execNodeList);
char detail[100] = {0};
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", dropReq.sql, dropReq.sqlLen);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
return TSDB_CODE_ACTION_IN_PROGRESS;
}

View File

@ -635,16 +635,6 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
}
char detail[4000] = {0};
char sql[3000] = {0};
strncpy(sql, createTopicReq.sql, 2999);
SName tableName = {0};
tNameFromString(&tableName, createTopicReq.subStbName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(detail, "igExists:%d, subStbName:%s, subType:%d, withMeta:%d, sql:%s",
createTopicReq.igExists, tableName.tname, createTopicReq.subType, createTopicReq.withMeta, sql);
SName dbname = {0};
tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB);
@ -652,7 +642,8 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for topic
auditRecord(pReq, pMnode->clusterId, "createTopic", topicName.dbname, dbname.dbname, detail);
auditRecord(pReq, pMnode->clusterId, "createTopic", topicName.dbname, dbname.dbname,
createTopicReq.sql, strlen(createTopicReq.sql));
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -703,10 +694,12 @@ static int32_t mndProcessDropTopicReq(SRpcMsg *pReq) {
if (pTopic == NULL) {
if (dropReq.igNotExists) {
mInfo("topic:%s, not exist, ignore not exist is set", dropReq.name);
tFreeSMDropTopicReq(&dropReq);
return 0;
} else {
terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST;
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
tFreeSMDropTopicReq(&dropReq);
return -1;
}
}
@ -847,17 +840,17 @@ end:
mndTransDrop(pTrans);
if (code != 0) {
mError("topic:%s, failed to drop since %s", dropReq.name, terrstr());
tFreeSMDropTopicReq(&dropReq);
return code;
}
char detail[100] = {0};
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for topic
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, "", detail);
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, "", dropReq.sql, dropReq.sqlLen);
tFreeSMDropTopicReq(&dropReq);
return TSDB_CODE_ACTION_IN_PROGRESS;
}

View File

@ -656,11 +656,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[1000] = {0};
sprintf(detail, "createType:%d, enable:%d, superUser:%d, sysInfo:%d",
createReq.createType, createReq.enable, createReq.superUser, createReq.sysInfo);
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", detail);
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -669,6 +665,7 @@ _OVER:
mndReleaseUser(pMnode, pUser);
mndReleaseUser(pMnode, pOperUser);
tFreeSCreateUserReq(&createReq);
return code;
}
@ -1038,20 +1035,17 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
code = mndAlterUser(pMnode, pUser, &newUser, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
char detail[1000] = {0};
sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo, alterReq.tabName);
if(alterReq.alterType == TSDB_ALTER_USER_PASSWD){
char detail[1000] = {0};
sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo,
alterReq.tabName);
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail);
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail, strlen(detail));
}
else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER ||
alterReq.alterType == TSDB_ALTER_USER_ENABLE ||
alterReq.alterType == TSDB_ALTER_USER_SYSINFO){
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail);
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", alterReq.sql, alterReq.sqlLen);
}
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB||
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB||
@ -1062,24 +1056,30 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (strcmp(alterReq.objname, "1.*") != 0){
SName name = {0};
tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, name.dbname, detail);
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, name.dbname,
alterReq.sql, alterReq.sqlLen);
}else{
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, "*", detail);
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, "*",
alterReq.sql, alterReq.sqlLen);
}
}
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC){
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, alterReq.objname, detail);
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, alterReq.objname,
alterReq.sql, alterReq.sqlLen);
}
else if(alterReq.alterType == TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC){
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, alterReq.objname, detail);
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, alterReq.objname,
alterReq.sql, alterReq.sqlLen);
}
else{
if (strcmp(alterReq.objname, "1.*") != 0){
SName name = {0};
tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, name.dbname, detail);
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, name.dbname,
alterReq.sql, alterReq.sqlLen);
}else{
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, "*", detail);
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, "*",
alterReq.sql, alterReq.sqlLen);
}
}
@ -1152,7 +1152,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
code = mndDropUser(pMnode, pReq, pUser);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
auditRecord(pReq, pMnode->clusterId, "dropUser", dropReq.user, "", "");
auditRecord(pReq, pMnode->clusterId, "dropUser", dropReq.user, "", dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1160,6 +1160,7 @@ _OVER:
}
mndReleaseUser(pMnode, pUser);
tFreeSDropUserReq(&dropReq);
return code;
}

View File

@ -2175,11 +2175,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", req.vgId);
char detail[1000] = {0};
sprintf(detail, "dnodeId1:%d, dnodeId2:%d, dnodeId3:%d",
req.dnodeId1, req.dnodeId2, req.dnodeId3);
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", obj, "", detail);
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", obj, "", req.sql, req.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -2195,6 +2191,7 @@ _OVER:
mndReleaseDnode(pMnode, pOld3);
mndReleaseVgroup(pMnode, pVgroup);
mndReleaseDb(pMnode, pDb);
tFreeSRedistributeVgroupReq(&req);
return code;
}
@ -2991,7 +2988,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
code = mndBalanceVgroup(pMnode, pReq, pArray);
}
auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", "");
auditRecord(pReq, pMnode->clusterId, "balanceVgroup", "", "", req.sql, req.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -2999,6 +2996,7 @@ _OVER:
}
taosArrayDestroy(pArray);
tFreeSBalanceVgroupReq(&req);
return code;
}

View File

@ -102,6 +102,7 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) {
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
taosMemoryFreeClear(pCreateReq->sql);
if (pCreateReq->type == TSDB_CHILD_TABLE) {
taosArrayDestroy(pCreateReq->ctb.tagName);
}

View File

@ -942,16 +942,14 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
taosArrayPush(rsp.pArray, &cRsp);
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
if(pCreateReq->sqlLen > 0){ //skip auto create table, not set sql when auto create table
int32_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
char detail[1000] = {0};
sprintf(detail, "btime:%" PRId64 ", flags:%d, ttl:%d, type:%d",
pCreateReq->btime, pCreateReq->flags, pCreateReq->ttl, pCreateReq->type);
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, clusterId, "createTable", name.dbname, pCreateReq->name, detail);
auditRecord(pReq, clusterId, "createTable", name.dbname, pCreateReq->name, pCreateReq->sql, pCreateReq->sqlLen);
}
}
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
@ -976,6 +974,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
_exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFree(pCreateReq->sql);
taosMemoryFree(pCreateReq->comment);
taosArrayDestroy(pCreateReq->ctb.tagName);
}

View File

@ -30,14 +30,17 @@ int32_t auditInit(const SAuditCfg *pCfg) {
return 0;
}
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail);
extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail) {
auditRecordImp(pReq, clusterId, operation, target1, target2, detail);
void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
//auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len);
auditRecordImp(pReq, clusterId, operation, target1, target2, detail, 0);
}
#ifndef TD_ENTERPRISE
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail) {
void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2,
char *detail, int32_t len) {
}
#endif

View File

@ -4674,9 +4674,133 @@ static int32_t checkCreateDatabase(STranslateContext* pCxt, SCreateDatabaseStmt*
return checkDatabaseOptions(pCxt, pStmt->dbName, pStmt->pOptions);
}
#define FILL_CMD_SQL(sql, sqlLen, pCmdReq, CMD_TYPE, genericCmd) \
CMD_TYPE* pCmdReq = genericCmd; \
char* cmdSql = taosMemoryMalloc(sqlLen); \
if (cmdSql == NULL) { \
return TSDB_CODE_OUT_OF_MEMORY; \
} \
memcpy(cmdSql, sql, sqlLen); \
pCmdReq->sqlLen = sqlLen; \
pCmdReq->sql = cmdSql; \
static int32_t fillCmdSql(STranslateContext* pCxt, int16_t msgType, void* pReq) {
const char* sql = pCxt->pParseCxt->pSql;
size_t sqlLen = pCxt->pParseCxt->sqlLen;
switch (msgType) {
case TDMT_MND_CREATE_DB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCreateDbReq, pReq);
break;
}
case TDMT_MND_ALTER_DB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAlterDbReq, pReq);
break;
}
case TDMT_MND_DROP_DB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SDropDbReq, pReq);
break;
}
case TDMT_MND_COMPACT_DB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCompactDbReq, pReq);
break;
}
case TDMT_MND_TMQ_DROP_TOPIC: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropTopicReq, pReq);
break;
}
case TDMT_MND_BALANCE_VGROUP_LEADER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupLeaderReq, pReq);
break;
}
case TDMT_MND_BALANCE_VGROUP: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SBalanceVgroupReq, pReq);
break;
}
case TDMT_MND_REDISTRIBUTE_VGROUP: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SRedistributeVgroupReq, pReq);
break;
}
case TDMT_MND_CREATE_STB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateStbReq, pReq);
break;
}
case TDMT_MND_DROP_STB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropStbReq, pReq);
break;
}
case TDMT_MND_ALTER_STB: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMAlterStbReq, pReq);
break;
}
case TDMT_MND_DROP_USER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SDropUserReq, pReq);
break;
}
case TDMT_MND_CREATE_USER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCreateUserReq, pReq);
break;
}
case TDMT_MND_ALTER_USER: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SAlterUserReq, pReq);
break;
}
case TDMT_MND_CREATE_QNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateQnodeReq, pReq);
break;
}
case TDMT_MND_DROP_QNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropQnodeReq, pReq);
break;
}
case TDMT_MND_CREATE_MNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCreateMnodeReq, pReq);
break;
}
case TDMT_MND_DROP_MNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropMnodeReq, pReq);
break;
}
case TDMT_MND_CREATE_DNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SCreateDnodeReq, pReq);
break;
}
case TDMT_MND_DROP_DNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SDropDnodeReq, pReq);
break;
}
case TDMT_MND_RESTORE_DNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SRestoreDnodeReq, pReq);
break;
}
case TDMT_MND_CONFIG_DNODE: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMCfgDnodeReq, pReq);
break;
}
case TDMT_MND_DROP_STREAM: {
FILL_CMD_SQL(sql, sqlLen, pCmdReq, SMDropStreamReq, pReq);
break;
}
default: {
break;
}
}
return TSDB_CODE_SUCCESS;
}
typedef int32_t (*FSerializeFunc)(void* pBuf, int32_t bufLen, void* pReq);
static int32_t buildCmdMsg(STranslateContext* pCxt, int16_t msgType, FSerializeFunc func, void* pReq) {
fillCmdSql(pCxt, msgType, pReq);
pCxt->pCmdMsg = taosMemoryMalloc(sizeof(SCmdMsgInfo));
if (NULL == pCxt->pCmdMsg) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -4713,7 +4837,9 @@ static int32_t translateDropDatabase(STranslateContext* pCxt, SDropDatabaseStmt*
tNameGetFullDbName(&name, dropReq.db);
dropReq.ignoreNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_DB, (FSerializeFunc)tSerializeSDropDbReq, &dropReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_DB, (FSerializeFunc)tSerializeSDropDbReq, &dropReq);
tFreeSDropDbReq(&dropReq);
return code;
}
static void buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStmt, SAlterDbReq* pReq) {
@ -4749,7 +4875,9 @@ static int32_t translateAlterDatabase(STranslateContext* pCxt, SAlterDatabaseStm
SAlterDbReq alterReq = {0};
buildAlterDbReq(pCxt, pStmt, &alterReq);
return buildCmdMsg(pCxt, TDMT_MND_ALTER_DB, (FSerializeFunc)tSerializeSAlterDbReq, &alterReq);
code = buildCmdMsg(pCxt, TDMT_MND_ALTER_DB, (FSerializeFunc)tSerializeSAlterDbReq, &alterReq);
tFreeSAlterDbReq(&alterReq);
return code;
}
static int32_t translateTrimDatabase(STranslateContext* pCxt, STrimDatabaseStmt* pStmt) {
@ -5466,6 +5594,7 @@ static int32_t doTranslateDropSuperTable(STranslateContext* pCxt, const SName* p
tNameExtractFullName(pTableName, dropReq.name);
dropReq.igNotExists = ignoreNotExists;
code = buildCmdMsg(pCxt, TDMT_MND_DROP_STB, (FSerializeFunc)tSerializeSMDropStbReq, &dropReq);
tFreeSMDropStbReq(&dropReq);
}
return code;
}
@ -5745,7 +5874,9 @@ static int32_t translateCreateUser(STranslateContext* pCxt, SCreateUserStmt* pSt
createReq.enable = 1;
strcpy(createReq.pass, pStmt->password);
return buildCmdMsg(pCxt, TDMT_MND_CREATE_USER, (FSerializeFunc)tSerializeSCreateUserReq, &createReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_USER, (FSerializeFunc)tSerializeSCreateUserReq, &createReq);
tFreeSCreateUserReq(&createReq);
return code;
}
static int32_t translateAlterUser(STranslateContext* pCxt, SAlterUserStmt* pStmt) {
@ -5760,14 +5891,18 @@ static int32_t translateAlterUser(STranslateContext* pCxt, SAlterUserStmt* pStmt
snprintf(alterReq.objname, sizeof(alterReq.objname), "%s", pCxt->pParseCxt->db);
}
return buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &alterReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &alterReq);
tFreeSAlterUserReq(&alterReq);
return code;
}
static int32_t translateDropUser(STranslateContext* pCxt, SDropUserStmt* pStmt) {
SDropUserReq dropReq = {0};
strcpy(dropReq.user, pStmt->userName);
return buildCmdMsg(pCxt, TDMT_MND_DROP_USER, (FSerializeFunc)tSerializeSDropUserReq, &dropReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_USER, (FSerializeFunc)tSerializeSDropUserReq, &dropReq);
tFreeSDropUserReq(&dropReq);
return code;
}
static int32_t translateCreateDnode(STranslateContext* pCxt, SCreateDnodeStmt* pStmt) {
@ -5775,7 +5910,9 @@ static int32_t translateCreateDnode(STranslateContext* pCxt, SCreateDnodeStmt* p
strcpy(createReq.fqdn, pStmt->fqdn);
createReq.port = pStmt->port;
return buildCmdMsg(pCxt, TDMT_MND_CREATE_DNODE, (FSerializeFunc)tSerializeSCreateDnodeReq, &createReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CREATE_DNODE, (FSerializeFunc)tSerializeSCreateDnodeReq, &createReq);
tFreeSCreateDnodeReq(&createReq);
return code;
}
static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt) {
@ -5786,7 +5923,9 @@ static int32_t translateDropDnode(STranslateContext* pCxt, SDropDnodeStmt* pStmt
dropReq.force = pStmt->force;
dropReq.unsafe = pStmt->unsafe;
return buildCmdMsg(pCxt, TDMT_MND_DROP_DNODE, (FSerializeFunc)tSerializeSDropDnodeReq, &dropReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_DNODE, (FSerializeFunc)tSerializeSDropDnodeReq, &dropReq);
tFreeSDropDnodeReq(&dropReq);
return code;
}
static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pStmt) {
@ -5795,7 +5934,9 @@ static int32_t translateAlterDnode(STranslateContext* pCxt, SAlterDnodeStmt* pSt
strcpy(cfgReq.config, pStmt->config);
strcpy(cfgReq.value, pStmt->value);
return buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_CONFIG_DNODE, (FSerializeFunc)tSerializeSMCfgDnodeReq, &cfgReq);
tFreeSMCfgDnodeReq(&cfgReq);
return code;
}
static int32_t translateRestoreDnode(STranslateContext* pCxt, SRestoreComponentNodeStmt* pStmt) {
@ -5817,7 +5958,10 @@ static int32_t translateRestoreDnode(STranslateContext* pCxt, SRestoreComponentN
default:
return -1;
}
return buildCmdMsg(pCxt, TDMT_MND_RESTORE_DNODE, (FSerializeFunc)tSerializeSRestoreDnodeReq, &restoreReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_RESTORE_DNODE, (FSerializeFunc)tSerializeSRestoreDnodeReq, &restoreReq);
tFreeSRestoreDnodeReq(&restoreReq);
return code;
}
static int32_t getSmaIndexDstVgId(STranslateContext* pCxt, const char* pDbName, const char* pTableName,
@ -6087,8 +6231,10 @@ static int16_t getCreateComponentNodeMsgType(ENodeType type) {
static int32_t translateCreateComponentNode(STranslateContext* pCxt, SCreateComponentNodeStmt* pStmt) {
SMCreateQnodeReq createReq = {.dnodeId = pStmt->dnodeId};
return buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)),
int32_t code = buildCmdMsg(pCxt, getCreateComponentNodeMsgType(nodeType(pStmt)),
(FSerializeFunc)tSerializeSCreateDropMQSNodeReq, &createReq);
tFreeSMCreateQnodeReq(&createReq);
return code;
}
static int16_t getDropComponentNodeMsgType(ENodeType type) {
@ -6109,8 +6255,10 @@ static int16_t getDropComponentNodeMsgType(ENodeType type) {
static int32_t translateDropComponentNode(STranslateContext* pCxt, SDropComponentNodeStmt* pStmt) {
SDDropQnodeReq dropReq = {.dnodeId = pStmt->dnodeId};
return buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)),
int32_t code = buildCmdMsg(pCxt, getDropComponentNodeMsgType(nodeType(pStmt)),
(FSerializeFunc)tSerializeSCreateDropMQSNodeReq, &dropReq);
tFreeSDDropQnodeReq(&dropReq);
return code;
}
static int32_t checkTopicQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -6299,7 +6447,9 @@ static int32_t translateDropTopic(STranslateContext* pCxt, SDropTopicStmt* pStmt
snprintf(dropReq.name, sizeof(dropReq.name), "%d.%s", pCxt->pParseCxt->acctId, pStmt->topicName);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_TMQ_DROP_TOPIC, (FSerializeFunc)tSerializeSMDropTopicReq, &dropReq);
tFreeSMDropTopicReq(&dropReq);
return code;
}
static int32_t translateDropCGroup(STranslateContext* pCxt, SDropCGroupStmt* pStmt) {
@ -6367,6 +6517,7 @@ static int32_t translateCompact(STranslateContext* pCxt, SCompactDatabaseStmt* p
if (TSDB_CODE_SUCCESS == code) {
code = buildCmdMsg(pCxt, TDMT_MND_COMPACT_DB, (FSerializeFunc)tSerializeSCompactDbReq, &compactReq);
}
tFreeSCompactDbReq(&compactReq);
return code;
}
@ -7226,7 +7377,9 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
tNameGetFullDbName(&name, dropReq.name);
dropReq.igNotExists = pStmt->ignoreNotExists;
return buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
tFreeSMDropStreamReq(&dropReq);
return code;
}
static int32_t translatePauseStream(STranslateContext* pCxt, SPauseStreamStmt* pStmt) {
@ -7421,17 +7574,23 @@ static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
strcpy(req.user, pStmt->userName);
sprintf(req.objname, "%d.%s", pCxt->pParseCxt->acctId, pStmt->objName);
sprintf(req.tabName, "%s", pStmt->tabName);
return buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
tFreeSAlterUserReq(&req);
return code;
}
static int32_t translateBalanceVgroup(STranslateContext* pCxt, SBalanceVgroupStmt* pStmt) {
SBalanceVgroupReq req = {0};
return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSBalanceVgroupReq, &req);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP, (FSerializeFunc)tSerializeSBalanceVgroupReq, &req);
tFreeSBalanceVgroupReq(&req);
return code;
}
static int32_t translateBalanceVgroupLeader(STranslateContext* pCxt, SBalanceVgroupLeaderStmt* pStmt) {
SBalanceVgroupLeaderReq req = {0};
return buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
int32_t code = buildCmdMsg(pCxt, TDMT_MND_BALANCE_VGROUP_LEADER, (FSerializeFunc)tSerializeSBalanceVgroupLeaderReq, &req);
tFreeSBalanceVgroupLeaderReq(&req);
return code;
}
static int32_t translateMergeVgroup(STranslateContext* pCxt, SMergeVgroupStmt* pStmt) {
@ -7475,6 +7634,7 @@ static int32_t translateRedistributeVgroup(STranslateContext* pCxt, SRedistribut
req.dnodeId3 = pStmt->dnodeId3;
code = buildCmdMsg(pCxt, TDMT_MND_REDISTRIBUTE_VGROUP, (FSerializeFunc)tSerializeSRedistributeVgroupReq, &req);
}
tFreeSRedistributeVgroupReq(&req);
return code;
}
@ -8199,7 +8359,7 @@ typedef struct SVgroupCreateTableBatch {
} SVgroupCreateTableBatch;
static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pVgroupInfo,
SVgroupCreateTableBatch* pBatch) {
SVgroupCreateTableBatch* pBatch, STranslateContext* pCxt) {
char dbFName[TSDB_DB_FNAME_LEN] = {0};
SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
strcpy(name.dbname, pStmt->dbName);
@ -8209,6 +8369,14 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt*
req.type = TD_NORMAL_TABLE;
req.name = taosStrdup(pStmt->tableName);
req.ttl = pStmt->pOptions->ttl;
req.sqlLen = pCxt->pParseCxt->sqlLen;
if (req.sqlLen > 0) {
req.sql = taosMemoryMalloc(pCxt->pParseCxt->sqlLen);
if (NULL == req.sql) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memcpy(req.sql, pCxt->pParseCxt->pSql, req.sqlLen);
}
if (pStmt->pOptions->commentNull == false) {
req.comment = taosStrdup(pStmt->pOptions->comment);
if (NULL == req.comment) {
@ -8313,14 +8481,14 @@ static void destroyCreateTbReqArray(SArray* pArray) {
}
static int32_t buildCreateTableDataBlock(int32_t acctId, const SCreateTableStmt* pStmt, const SVgroupInfo* pInfo,
SArray** pBufArray) {
SArray** pBufArray, STranslateContext* pCxt) {
*pBufArray = taosArrayInit(1, POINTER_BYTES);
if (NULL == *pBufArray) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SVgroupCreateTableBatch tbatch = {0};
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch);
int32_t code = buildNormalTableBatchReq(acctId, pStmt, pInfo, &tbatch, pCxt);
if (TSDB_CODE_SUCCESS == code) {
code = serializeVgroupCreateTableBatch(&tbatch, *pBufArray);
}
@ -8347,7 +8515,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
}
SArray* pBufArray = NULL;
if (TSDB_CODE_SUCCESS == code) {
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray);
code = buildCreateTableDataBlock(pCxt->pParseCxt->acctId, pStmt, &info, &pBufArray, pCxt);
}
if (TSDB_CODE_SUCCESS == code) {
code = rewriteToVnodeModifyOpStmt(pQuery, pBufArray);
@ -8361,7 +8529,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) {
static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt,
const STag* pTag, uint64_t suid, const char* sTableNmae, SVgroupInfo* pVgInfo,
SArray* tagName, uint8_t tagNum) {
SArray* tagName, uint8_t tagNum, STranslateContext* pCxt) {
// char dbFName[TSDB_DB_FNAME_LEN] = {0};
// SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId};
// strcpy(name.dbname, pStmt->dbName);
@ -8371,6 +8539,11 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S
req.type = TD_CHILD_TABLE;
req.name = taosStrdup(pStmt->tableName);
req.ttl = pStmt->pOptions->ttl;
req.sqlLen = pCxt->pParseCxt->sqlLen;
if (req.sqlLen > 0) {
req.sql = taosMemoryMalloc(pCxt->pParseCxt->sqlLen);
memcpy(req.sql, pCxt->pParseCxt->pSql, req.sqlLen);
}
if (pStmt->pOptions->commentNull == false) {
req.comment = taosStrdup(pStmt->pOptions->comment);
req.commentLen = strlen(pStmt->pOptions->comment);
@ -8647,7 +8820,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla
}
if (TSDB_CODE_SUCCESS == code) {
addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid,
pStmt->useTableName, &info, tagName, pSuperTableMeta->tableInfo.numOfTags);
pStmt->useTableName, &info, tagName, pSuperTableMeta->tableInfo.numOfTags, pCxt);
} else {
taosMemoryFree(pTag);
}