From 096e7054f0f2a200ac863b7ff5a34519f3270bf7 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 8 Dec 2022 14:16:57 +0800 Subject: [PATCH] refact: tsma/rsma process --- source/dnode/vnode/src/sma/smaRollup.c | 14 +++++++++----- source/libs/executor/src/executor.c | 9 ++++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8fa0f05a63..7de24467f8 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -750,12 +750,14 @@ _err: */ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t len, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid) { - void *qItem = taosAllocateQitem(len, DEF_QITEM); + int32_t size = sizeof(int32_t) + len; + void *qItem = taosAllocateQitem(size, DEF_QITEM); if (!qItem) { return TSDB_CODE_FAILED; } - memcpy(qItem, pMsg, len); + *(int32_t *)qItem = len; + memcpy(POINTER_SHIFT(qItem, sizeof(int32_t)), pMsg, len); taosWriteQitem(pInfo->queue, qItem); @@ -1367,7 +1369,8 @@ _end: static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { - taosFreeQitem(*(void **)taosArrayGet(pItems, i)); + SPackedData *packData = taosArrayGet(pItems, i); + taosFreeQitem(POINTER_SHIFT(packData->msgStr, -sizeof(int32_t))); } taosArrayClear(pItems); } @@ -1436,7 +1439,8 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA void *msg = NULL; taosGetQitem(qall, (void **)&msg); if (msg) { - if (!taosArrayPush(pSubmitArr, &msg)) { + SPackedData packData = {.msgLen = *(int32_t *)msg, .msgStr = POINTER_SHIFT(msg, sizeof(int32_t))}; + if (!taosArrayPush(pSubmitArr, &packData)) { tdFreeRSmaSubmitItems(pSubmitArr); goto _err; } @@ -1491,7 +1495,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } if (!(pSubmitArr = - taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { + taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0c6ecaf343..3e0c5a9901 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -51,8 +51,8 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf if (type == STREAM_INPUT__MERGED_SUBMIT) { for (int32_t i = 0; i < numOfBlocks; i++) { - SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); - taosArrayPush(pInfo->pBlockLists, &pReq); + SPackedData* pReq = POINTER_SHIFT(input, i * sizeof(SPackedData)); + taosArrayPush(pInfo->pBlockLists, pReq); } pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; } else if (type == STREAM_INPUT__DATA_SUBMIT) { @@ -61,7 +61,10 @@ static int32_t doSetSMABlock(SOperatorInfo* pOperator, void* input, size_t numOf } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; - taosArrayPush(pInfo->pBlockLists, &pDataBlock); + SPackedData tmp = { + .pDataBlock = pDataBlock, + }; + taosArrayPush(pInfo->pBlockLists, &tmp); } pInfo->blockType = STREAM_INPUT__DATA_BLOCK; }