diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index dfd78d9dca..466c300d97 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -162,7 +162,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { vTrace("message in fetch queue is processing"); - char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + char * msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); switch (pMsg->msgType) { case TDMT_VND_FETCH: @@ -184,8 +184,12 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) { case TDMT_VND_TASK_PIPE_EXEC: case TDMT_VND_TASK_MERGE_EXEC: return tqProcessTaskExec(pVnode->pTq, msgstr, msgLen, 0); - case TDMT_VND_STREAM_TRIGGER: - return tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); + case TDMT_VND_STREAM_TRIGGER: { + // refactor, avoid double free + int code = tqProcessStreamTrigger(pVnode->pTq, pMsg->pCont, pMsg->contLen, 0); + pMsg->pCont = NULL; + return code; + } case TDMT_VND_QUERY_HEARTBEAT: return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg); default: @@ -328,12 +332,12 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, SDecoder decoder = {0}; int rcode = 0; SVCreateTbBatchReq req = {0}; - SVCreateTbReq *pCreateReq; + SVCreateTbReq * pCreateReq; SVCreateTbBatchRsp rsp = {0}; SVCreateTbRsp cRsp = {0}; char tbName[TSDB_TABLE_FNAME_LEN]; - STbUidStore *pStore = NULL; - SArray *tbUids = NULL; + STbUidStore * pStore = NULL; + SArray * tbUids = NULL; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -517,7 +521,7 @@ static int vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq, in SDecoder decoder = {0}; SEncoder encoder = {0}; int ret; - SArray *tbUids = NULL; + SArray * tbUids = NULL; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -572,9 +576,9 @@ _exit: static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSubmitMsgIter *msgIter, const char *tags) { SSubmitBlkIter blkIter = {0}; - STSchema *pSchema = NULL; + STSchema * pSchema = NULL; tb_uid_t suid = 0; - STSRow *row = NULL; + STSRow * row = NULL; tInitSubmitBlkIter(msgIter, pBlock, &blkIter); if (blkIter.row == NULL) return 0; @@ -605,8 +609,8 @@ static int vnodeDebugPrintSingleSubmitMsg(SMeta *pMeta, SSubmitBlk *pBlock, SSub static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char *tags) { ASSERT(pMsg != NULL); SSubmitMsgIter msgIter = {0}; - SMeta *pMeta = pVnode->pMeta; - SSubmitBlk *pBlock = NULL; + SMeta * pMeta = pVnode->pMeta; + SSubmitBlk * pBlock = NULL; if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { @@ -620,10 +624,10 @@ static int vnodeDebugPrintSubmitMsg(SVnode *pVnode, SSubmitReq *pMsg, const char } static int vnodeProcessSubmitReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { - SSubmitReq *pSubmitReq = (SSubmitReq *)pReq; + SSubmitReq * pSubmitReq = (SSubmitReq *)pReq; SSubmitRsp submitRsp = {0}; SSubmitMsgIter msgIter = {0}; - SSubmitBlk *pBlock; + SSubmitBlk * pBlock; SSubmitRsp rsp = {0}; SVCreateTbReq createTbReq = {0}; SDecoder decoder = {0};