From 0de7f3e3a7b29ddd2f2fc08a74481b9074eb3ca5 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 20 Aug 2022 23:46:12 +0800 Subject: [PATCH] enh: rsma code optimization --- source/dnode/vnode/src/inc/sma.h | 1 - source/dnode/vnode/src/sma/smaRollup.c | 39 +++++++++++--------------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index d32d67e29c..5ba22f6db8 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -123,7 +123,6 @@ struct SRSmaInfoItem { uint8_t nSkipped; // number of skipped to fetch data from all active window int8_t fetchLevel; int32_t maxDelay; // ms - int64_t lastFetch; // ms tmr_h tmrId; }; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 5ef5bd0aea..e2d7a588b0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,9 +19,8 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (180000) // ms -#define RSMA_FETCH_SKIP_MAX (10) // cnt -#define RSMA_FETCH_ACTIVE_MAX (180) // ms +#define RSMA_FETCH_DELAY_MAX (900000) // ms +#define RSMA_FETCH_ACTIVE_MAX (1800) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -1646,26 +1645,20 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm } int64_t curMs = taosGetTimestampMs(); - // if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - // pItem->nSkipped = 0; - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - // SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - // } else { - // if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - // ++pItem->nSkipped; - // smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - // continue; - // } else { - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - // } - // } + if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + ++pItem->nSkipped; + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + continue; + } else { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } - pItem->lastFetch = curMs; - - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + pItem->nSkipped = 0; if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; @@ -1680,7 +1673,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); } else { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 - " maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8, + " maxDelay:%d, fetch not executed as fetch level is %" PRIi8, SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); } }