From cd6df48df5ffdd76f2a262deca38a6e53326d8ed Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 16 Nov 2023 01:49:31 +0000 Subject: [PATCH] create steam --- include/common/tmsg.h | 2 ++ source/common/src/tmsg.c | 17 +++++++++++++++++ source/dnode/mnode/impl/src/mndStream.c | 6 +++--- 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d08b424e9c..678fbbd38e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2355,6 +2355,8 @@ typedef struct { int64_t deleteMark; int8_t igUpdate; int64_t lastTs; + char* createSQL; + int32_t createSQLLen; } SCMCreateStreamReq; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c67b9e5e68..03ff32ddc8 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -6878,6 +6878,11 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeI8(&encoder, pReq->igUpdate) < 0) return -1; if (tEncodeI64(&encoder, pReq->lastTs) < 0) return -1; + if (pReq->createSQLLen > 0 && pReq->sql != NULL){ + if (tEncodeI32(&encoder, pReq->createSQLLen) < 0) return -1; + if (tEncodeBinary(&encoder, pReq->createSQL, pReq->createSQLLen) < 0) return -1; + } + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -6964,6 +6969,13 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea if (tDecodeI8(&decoder, &pReq->igUpdate) < 0) return -1; if (tDecodeI64(&decoder, &pReq->lastTs) < 0) return -1; + if(!tDecodeIsEnd(&decoder)){ + if(tDecodeI32(&decoder, &pReq->createSQLLen) < 0) return -1; + if(pReq->createSQLLen > 0){ + if (tDecodeBinaryAlloc(&decoder, (void **)&pReq->createSQL, NULL) < 0) return -1; + } + } + tEndDecode(&decoder); tDecoderClear(&decoder); @@ -7044,6 +7056,11 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->ast); taosArrayDestroy(pReq->fillNullCols); + + if(pReq->createSQL != NULL){ + taosMemoryFree(pReq->createSQL); + } + pReq->createSQL = NULL; } int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4695ad5166..5bdef54333 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -868,9 +868,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); // reuse this function for stream - if (createStreamReq.sql != NULL) { - auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, createStreamReq.sql, - strlen(createStreamReq.sql)); + if (createStreamReq.createSQL != NULL) { + auditRecord(pReq, pMnode->clusterId, "createStream", dbname.dbname, name.dbname, createStreamReq.createSQL, + createStreamReq.createSQLLen); } else{ char detail[1000] = {0};