diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 09fdd64c7e..e8f2555f17 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -756,24 +756,14 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz); - void* pBuf = NULL; - SSubmitReq2* pReq = NULL; - SArray* tagArray = NULL; - SArray* pVals = NULL; + void* pBuf = NULL; + SArray* tagArray = NULL; + SArray* pVals = NULL; if (!(tagArray = taosArrayInit(1, sizeof(STagVal)))) { goto _end; } - if (!(pReq = taosMemoryCalloc(1, sizeof(SSubmitReq2)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _end; - } - - if (!(pReq->aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { - goto _end; - } - for (int32_t i = 0; i < blockSz; i++) { SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i); if (pDataBlock->info.type == STREAM_DELETE_RESULT) { @@ -948,13 +938,17 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* taosArrayPush(tbData.aRowP, &pRow); } - taosArrayClear(pReq->aSubmitTbData); - taosArrayPush(pReq->aSubmitTbData, &tbData); + SSubmitReq2 submitReq = {0}; + if (!(submitReq.aSubmitTbData = taosArrayInit(1, sizeof(SSubmitTbData)))) { + goto _end; + } + + taosArrayPush(submitReq.aSubmitTbData, &tbData); // encode int32_t len; int32_t code; - tEncodeSize(tEncodeSSubmitReq2, pReq, len, code); + tEncodeSize(tEncodeSSubmitReq2, &submitReq, len, code); SEncoder encoder; len += sizeof(SMsgHead); pBuf = rpcMallocCont(len); @@ -964,7 +958,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* ((SMsgHead*)pBuf)->vgId = TD_VID(pVnode); ((SMsgHead*)pBuf)->contLen = htonl(len); tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len - sizeof(SMsgHead)); - if (tEncodeSSubmitReq2(&encoder, pReq) < 0) { + if (tEncodeSSubmitReq2(&encoder, &submitReq) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("failed to encode submit req since %s", terrstr()); tEncoderClear(&encoder); @@ -972,6 +966,7 @@ void tqSinkToTablePipeline2(SStreamTask* pTask, void* vnode, int64_t ver, void* continue; } tEncoderClear(&encoder); + tDestroySSubmitReq2(&submitReq, TSDB_MSG_FLG_ENCODE); SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, @@ -988,8 +983,6 @@ _end: taosArrayDestroy(tagArray); taosArrayDestroy(pVals); // TODO: change - tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); - taosMemoryFree(pReq); } #if 0