create steam

This commit is contained in:
dmchen 2023-11-16 01:49:31 +00:00
parent 82498a9b4e
commit cd6df48df5
3 changed files with 22 additions and 3 deletions

View File

@ -2355,6 +2355,8 @@ typedef struct {
int64_t deleteMark;
int8_t igUpdate;
int64_t lastTs;
char* createSQL;
int32_t createSQLLen;
} SCMCreateStreamReq;
typedef struct {

View File

@ -6878,6 +6878,11 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1;
if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1;
if (pReq->createSQLLen > 0 && pReq->sql != NULL){
if (tEncodeI32(&encoder, pReq->createSQLLen) < 0) return -1;
if (tEncodeBinary(&encoder, pReq->createSQL, pReq->createSQLLen) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
@ -6964,6 +6969,13 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1;
if(!tDecodeIsEnd(&decoder)){
if(tDecodeI32(&decoder, &pReq->createSQLLen) < 0) return -1;
if(pReq->createSQLLen > 0){
if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->createSQL, NULL) < 0) return -1;
}
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -7044,6 +7056,11 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosMemoryFreeClear(pReq->sql);
taosMemoryFreeClear(pReq->ast);
taosArrayDestroy(pReq->fillNullCols);
if(pReq->createSQL != NULL){
taosMemoryFree(pReq->createSQL);
}
pReq->createSQL = NULL;
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {

View File

@ -868,9 +868,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
// reuse this function for stream
if (createStreamReq.sql != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, createStreamReq.sql,
strlen(createStreamReq.sql));
if (createStreamReq.createSQL != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, createStreamReq.createSQL,
createStreamReq.createSQLLen);
}
else{
char detail[1000] = {0};