From 4b3d98916855c13d72b73c148ade995600efed26 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 01:19:32 +0800 Subject: [PATCH] other: naming optimization --- source/dnode/vnode/src/sma/smaRollup.c | 67 +++++++------------------- 1 file changed, 18 insertions(+), 49 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 72f779eaab..959a33d8ce 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -42,9 +42,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid); +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -636,8 +636,8 @@ _end: return code; } -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid) { +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -815,7 +815,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); return TSDB_CODE_SUCCESS; } @@ -1596,46 +1596,15 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { } } -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { - // step 1: consume submit req -#if 0 - int64_t qMemSize = 0; - if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall)); - - taosArrayClear(pSubmitArr); - - while (1) { - void *msg = NULL; - taosGetQitem(pInfo->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } else { - break; - } - } - - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) < - 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } - - tdFreeRSmaSubmitItems(pSubmitArr); - } - } -#endif - // step 2: fetch rsma result(consider the efficiency and functionality) +/** + * @brief fetch rsma result(consider the efficiency and functionality) + * + * @param pSma + * @param pInfo + * @param pSubmitArr + * @return int32_t + */ +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); @@ -1665,7 +1634,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { tdCleanupStreamInputDataBlock(taskInfo); goto _err; } @@ -1772,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); } - tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); if (qallItemSize > 0) { // subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); @@ -1803,7 +1772,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); } - // tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + // tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } }