From b0789d1a868f43297f82dcbdfeb62f2d8b541b48 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 2 Dec 2022 04:37:16 +0800 Subject: [PATCH] chore: rsma logic optimization --- source/dnode/vnode/src/sma/smaRollup.c | 39 ++++++++++++++------------ 1 file changed, 21 insertions(+), 18 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 1a2bd568d5..c92621776c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -21,6 +21,8 @@ #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms +#define RSMA_NEED_FETCH(r) (RSMA_INFO_ITEM((r), 0)->fetchLevel || RSMA_INFO_ITEM((r), 1)->fetchLevel) + SSmaMgmt smaMgmt = { .inited = 0, .rsetId = -1, @@ -827,6 +829,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, return TSDB_CODE_SUCCESS; } if (!pInfo->pTSchema) { + terrno = TSDB_CODE_INVALID_PTR; smaWarn("vgId:%d, no schema to execute rsma %" PRIi8 " task for suid:%" PRIu64, SMA_VID(pSma), level, pInfo->suid); return TSDB_CODE_FAILED; } @@ -841,7 +844,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, tdRsmaPrintSubmitReq(pSma, pReq); } #endif - if (qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType) < 0) { + if ((terrno = qSetSMAInput(qTaskInfo, pMsg, msgSize, inputType)) < 0) { smaError("vgId:%d, rsma %" PRIi8 " qSetStreamInput failed since %s", SMA_VID(pSma), level, tstrerror(terrno)); return TSDB_CODE_FAILED; } @@ -1438,7 +1441,7 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA taosGetQitem(qall, (void **)&msg); if (msg) { if (!taosArrayPush(pSubmitArr, &msg)) { - tdFreeRSmaSubmitItems(pSubmitArr); + terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } } else { @@ -1450,7 +1453,6 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA if (size > 0) { for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); goto _err; } } @@ -1458,6 +1460,9 @@ static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SA } return TSDB_CODE_SUCCESS; _err: + smaError("vgId:%d, batch exec for suid:%" PRIi64 " execType:%d size:%d failed since %s", SMA_VID(pSma), pInfo->suid, + type, (int32_t)taosArrayGetSize(pSubmitArr), terrstr()); + tdFreeRSmaSubmitItems(pSubmitArr); while (1) { void *msg = NULL; taosGetQitem(qall, (void **)&msg); @@ -1504,8 +1509,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { while ((pIter = taosHashIterate(infoHash, pIter))) { SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { - if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || - RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + if ((taosQueueItemSize(pInfo->queue) > 0) || RSMA_NEED_FETCH(pInfo)) { int32_t batchCnt = -1; int32_t batchMax = taosHashGetSize(infoHash) / tsNumOfVnodeRsmaThreads; bool occupied = (batchMax <= 1); @@ -1521,13 +1525,20 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi32, SMA_VID(pSma), qallItemSize, type); } - if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + if (RSMA_NEED_FETCH(pInfo)) { int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); if (oldStat == 0 || ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); ASSERT(oldVal >= 0); - tdRSmaFetchAllResult(pSma, pInfo); + + int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)); + if (curStat == 1) { + smaDebug("vgId:%d, fetch all not exec as commit stat is %" PRIi8, SMA_VID(pSma), curStat); + } else { + tdRSmaFetchAllResult(pSma, pInfo); + } + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); } @@ -1537,17 +1548,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (qallItemSize > 0) { atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); continue; - } else if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - if (atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)) == 0) { - continue; - } - for (int32_t j = 0; j < TSDB_RETENTION_L2; ++j) { - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, j); - if (pItem->fetchLevel) { - pItem->fetchLevel = 0; - taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } - } + } + if (RSMA_NEED_FETCH(pInfo)) { + continue; } break;