diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index a8883babce..9931462e5f 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -143,7 +143,7 @@ struct SRSmaInfoItem { int8_t level : 4; int8_t fetchLevel : 4; int8_t triggerStat; - uint16_t nSkipped; + uint16_t nScanned; int32_t maxDelay; // ms tmr_h tmrId; }; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index d5260b8374..ec8fcb2932 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,7 +19,7 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (900000) // ms +#define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms @@ -1712,21 +1712,23 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { continue; } - int64_t curMs = taosGetTimestampMs(); - if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - } else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - ++pItem->nSkipped; - smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - continue; + if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); } else { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + int64_t curMs = taosGetTimestampMs(); + if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) { + smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); // restore the active stat + continue; + } else { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } } - pItem->nSkipped = 0; + pItem->nScanned = 0; if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; @@ -1737,12 +1739,12 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } tdCleanupStreamInputDataBlock(taskInfo); - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished", + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); } else { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch not executed as fetch level is %" PRIi8, - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel); } } @@ -1832,7 +1834,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { bool occupied = (batchMax <= 1); if (batchMax > 1) { batchMax = 100 / batchMax; - batchMax = MAX(batchMax, 4); + batchMax = TMAX(batchMax, 4); } while (occupied || (++batchCnt < batchMax)) { // greedy mode taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock