From 349e190120b736830b56b96514ee655b7e1e113b Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 9 Nov 2023 11:23:50 +0800 Subject: [PATCH] enh: rsma result --- source/dnode/vnode/src/inc/sma.h | 3 +- source/dnode/vnode/src/sma/smaEnv.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 90 +++++++++++--------------- 3 files changed, 39 insertions(+), 56 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index f45050bfec..29eaa0509a 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -149,6 +149,7 @@ struct SRSmaInfoItem { tmr_h tmrId; void *pStreamState; void *pStreamTask; // SStreamTask + SArray *pResList; }; struct SRSmaInfo { @@ -218,7 +219,7 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { int32_t smaPreClose(SSma *pSma); // rsma -void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); +void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index d47398bdff..dd12f2bca2 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -179,7 +179,7 @@ static void tRSmaInfoHashFreeNode(void *data) { if ((pItem = RSMA_INFO_ITEM((SRSmaInfo *)pRSmaInfo, 1)) && pItem->level) { taosHashRemove(smaMgmt.refHash, &pItem, POINTER_BYTES); } - tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo, true); + tdFreeRSmaInfo(pRSmaInfo->pSma, pRSmaInfo); } } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 92494553d0..73a0849ab2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -45,7 +45,7 @@ static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, - int32_t execType, SArray **ppResList, int8_t *streamFlushed); + int32_t execType, int8_t *streamFlushed); static void tdRSmaFetchTrigger(void *param, void *tmrId); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); @@ -74,41 +74,39 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l * * @param pSma * @param pInfo - * @param isDeepFree Only stop tmrId and free pTSchema for deep free * @return void* */ -void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { +void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { if (pInfo) { - if (isDeepFree) { - for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - SRSmaInfoItem *pItem = &pInfo->items[i]; + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + SRSmaInfoItem *pItem = &pInfo->items[i]; - if (pItem->tmrId) { - smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, - pInfo->suid, i + 1); - taosTmrStopA(&pItem->tmrId); - } - - if (pItem->pStreamState) { - streamStateClose(pItem->pStreamState, false); - } - - if (pItem->pStreamTask) { - tFreeStreamTask(pItem->pStreamTask); - } - tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); + if (pItem->tmrId) { + smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pItem->tmrId, + pInfo->suid, i + 1); + taosTmrStopA(&pItem->tmrId); } - taosMemoryFreeClear(pInfo->pTSchema); + if (pItem->pStreamState) { + streamStateClose(pItem->pStreamState, false); + } - if (pInfo->queue) { - taosCloseQueue(pInfo->queue); - pInfo->queue = NULL; - } - if (pInfo->qall) { - taosFreeQall(pInfo->qall); - pInfo->qall = NULL; + if (pItem->pStreamTask) { + tFreeStreamTask(pItem->pStreamTask); } + taosArrayDestroy(pItem->pResList); + tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); + } + + taosMemoryFreeClear(pInfo->pTSchema); + + if (pInfo->queue) { + taosCloseQueue(pInfo->queue); + pInfo->queue = NULL; + } + if (pInfo->qall) { + taosFreeQall(pInfo->qall); + pInfo->qall = NULL; } taosMemoryFree(pInfo); @@ -311,6 +309,10 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat return TSDB_CODE_FAILED; } + if (!(pItem->pResList = taosArrayInit(1, POINTER_BYTES))) { + return TSDB_CODE_FAILED; + } + if (pItem->fetchResultVer < pItem->submitReqVer) { // fetch the data when reboot pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; @@ -406,7 +408,7 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con return TSDB_CODE_SUCCESS; _err: - tdFreeRSmaInfo(pSma, pRSmaInfo, true); + tdFreeRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_FAILED; } @@ -623,27 +625,14 @@ _end: } static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, SRSmaInfo *pInfo, - int32_t execType, SArray **ppResList, int8_t *streamFlushed) { + int32_t execType, int8_t *streamFlushed) { int32_t code = 0; int32_t lino = 0; SSDataBlock *output = NULL; - SArray *pResList = NULL; + SArray *pResList = pItem->pResList; STSchema *pTSchema = pInfo->pTSchema; int64_t suid = pInfo->suid; - if (!(*ppResList)) { - pResList = taosArrayInit(1, POINTER_BYTES); - if (pResList == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - TSDB_CHECK_CODE(code, lino, _exit); - } - *ppResList = pResList; - } else { - pResList = *ppResList; - } - - taosArrayClear(pResList); - while (1) { uint64_t ts; bool hasMore = false; @@ -812,7 +801,6 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t idx = level - 1; void *qTaskInfo = RSMA_INFO_QTASK(pInfo, idx); SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - SArray *pResList = NULL; if (!qTaskInfo) { smaDebug("vgId:%d, no qTaskInfo to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, @@ -845,9 +833,8 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, atomic_store_64(&pItem->submitReqVer, packData->ver); } - terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, &pResList, NULL); + terrno = tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo, STREAM_NORMAL, NULL); - taosArrayDestroy(pResList); return terrno ? TSDB_CODE_FAILED : TDB_CODE_SUCCESS; } @@ -1135,7 +1122,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t nTaskInfo = 0; SSma *pSma = pRSmaStat->pSma; SVnode *pVnode = pSma->pVnode; - SArray *pResList = NULL; if (taosHashGetSize(pInfoHash) <= 0) { return TSDB_CODE_SUCCESS; @@ -1178,7 +1164,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { if (pRSmaInfo->taskInfo[i] && (0 == pRSmaInfo->items[i].streamFlushed)) { int8_t streamFlushed = 0; code = tdRSmaExecAndSubmitResult(pSma, pRSmaInfo->taskInfo[i], &pRSmaInfo->items[i], pRSmaInfo, - STREAM_CHECKPOINT, &pResList, &streamFlushed); + STREAM_CHECKPOINT, &streamFlushed); if (code) { taosHashCancelIterate(pInfoHash, infoHash); TSDB_CHECK_CODE(code, lino, _exit); @@ -1265,7 +1251,6 @@ _checkpoint: } } while (0); _exit: - taosArrayDestroy(pResList); if (code) { smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } @@ -1391,7 +1376,6 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { */ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - SArray *pResList = NULL; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); @@ -1422,7 +1406,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { if ((terrno = qSetSMAInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, &pResList, NULL) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo, STREAM_GET_ALL, NULL) < 0) { atomic_store_32(&SMA_RSMA_STAT(pSma)->execStat, terrno); goto _err; } @@ -1437,10 +1421,8 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } _end: - taosArrayDestroy(pResList); return TSDB_CODE_SUCCESS; _err: - taosArrayDestroy(pResList); return TSDB_CODE_FAILED; }