From 441ce21677594ff3562b4abb2a263549f1f16308 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 19 Aug 2022 17:19:50 +0800 Subject: [PATCH 01/10] enh: rsma batch process --- include/common/tmsg.h | 25 -- include/common/tmsgdef.h | 2 +- include/util/tqueue.h | 1 + source/dnode/mnode/impl/src/mndStb.c | 2 + source/dnode/vnode/src/inc/sma.h | 27 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/sma/smaCommit.c | 46 +-- source/dnode/vnode/src/sma/smaEnv.c | 13 +- source/dnode/vnode/src/sma/smaRollup.c | 462 +++++++++++------------- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 - source/libs/executor/src/executorimpl.c | 1 + source/util/src/tqueue.c | 3 +- 12 files changed, 253 insertions(+), 332 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f870bd161f..8f199c72f7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2667,31 +2667,6 @@ typedef struct { int32_t padding; } SRSmaExecMsg; -typedef struct { - int64_t suid; - int8_t level; -} SRSmaFetchMsg; - -static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { - if (tStartEncode(pCoder) < 0) return -1; - - if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; - if (tEncodeI8(pCoder, pReq->level) < 0) return -1; - - tEndEncode(pCoder); - return 0; -} - -static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; - - tEndDecode(pCoder); - return 0; -} - typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 16d5965759..e2bb3e2ae1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -201,7 +201,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) - TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 0f4f1db9ee..da409a90bb 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall); int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); int32_t taosGetQitem(STaosQall *qall, void **ppItem); void taosResetQitems(STaosQall *qall); +int32_t taosQallItemSize(STaosQall *qall); STaosQset *taosOpenQset(); void taosCloseQset(STaosQset *qset); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e0f2b83160..ebec3d5ea6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt if (req.rollup) { req.rsmaParam.maxdelay[0] = pStb->maxdelay[0]; req.rsmaParam.maxdelay[1] = pStb->maxdelay[1]; + req.rsmaParam.watermark[0] = pStb->watermark[0]; + req.rsmaParam.watermark[1] = pStb->watermark[1]; if (pStb->ast1Len > 0) { if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c43772062e..989d24295e 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -90,14 +90,14 @@ struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit int64_t refId; // shared by fetch tasks - volatile int64_t qBufSize; // queue buffer size + volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int8_t nExecutor; // [1, max(half of query threads, 4)] int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing - int8_t execStat; // 0 not in exec , 1 in exec SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SHashObj *infoHash; // key: suid, value: SRSmaInfo - SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2 + tsem_t notEmpty; // has items in queue buffer }; struct SSmaStat { @@ -111,26 +111,29 @@ struct SSmaStat { #define SMA_STAT_TSMA(s) (&(s)->tsmaStat) #define SMA_STAT_RSMA(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->infoHash) -#define RSMA_FETCH_HASH(r) ((r)->fetchHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; - int8_t triggerStat; - uint16_t interval; // second - int32_t maxDelay; - tmr_h tmrId; + int8_t level; + int8_t triggerStat; + uint8_t nSkipped; // number of skipped to fetch data from all active window + int8_t fetchLevel; + int32_t maxDelay; // ms + int64_t lastFetch; // ms + tmr_h tmrId; }; struct SRSmaInfo { STSchema *pTSchema; int64_t suid; - int64_t refId; // refId of SRSmaStat - uint64_t delFlag : 1; - uint64_t lastReceived : 63; // second + int64_t refId; // refId of SRSmaStat + int64_t lastRecv; // ms + int8_t delFlag; + int8_t assigned; // 0 idle, 1 assgined for exec + int16_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7ac1cc4f0e..63d228ec8b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -198,7 +198,6 @@ int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); -int32_t smaProcessFetch(SSma* pSma, void* pMsg); int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 8b92475035..7b63056897 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -321,10 +321,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -338,30 +338,25 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - nLoops = 0; - smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1); - if (old == 0) break; - if (++nLoops > 1000) { - sched_yield(); - nLoops = 0; - smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - } - } - - smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) { - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } + smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + nLoops = 0; + while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + +#if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); @@ -376,11 +371,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } - atomic_store_64(&pRSmaStat->qBufSize, 0); - atomic_store_8(&pRSmaStat->execStat, 0); // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); +#endif // step 5: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; @@ -426,7 +419,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge qTaskInfo and iQTaskInfo // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { @@ -480,10 +473,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); } taosArrayDestroy(rsmaDeleted); - // TODO: remove suid in files? // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // step 2: cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index f51aad22bd..d39efc748e 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); + tsem_init(&pRSmaStat->notEmpty, 0, 0); // init smaMgmt smaInit(); @@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (!RSMA_INFO_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } - - RSMA_FETCH_HASH(pRSmaStat) = taosHashInit( - RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if (!RSMA_FETCH_HASH(pRSmaStat)) { - return TSDB_CODE_FAILED; - } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { @@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat); // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); + tsem_destroy(&(pStat->notEmpty)); // step 2: destroy the rsma info and associated fetch tasks if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { @@ -279,10 +275,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: destroy the rsma fetch hash - taosHashCleanup(RSMA_FETCH_HASH(pStat)); - - // step 4: wait all triggered fetch tasks finished + // step 3: wait all triggered fetch tasks finished int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9b3b0cb63d..e8018c0f33 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,10 +15,14 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE (32768) -#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZE (1048576) -#define RSMA_SUBMIT_BATCH_SIZE (1024) +#define RSMA_QTASKINFO_BUFSIZE (32768) // size +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt +#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt +#define RSMA_EXECUTOR_MAX (4) // cnt +#define RSMA_FETCH_DELAY_MAX (1800000) // ms +#define RSMA_FETCH_SKIP_MAX (1000) // cnt +#define RSMA_FETCH_ACTIVE_MAX (1800) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -40,11 +44,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr); +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); @@ -668,7 +671,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -729,22 +732,24 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu taosWriteQitem(pInfo->queue, qItem); + pInfo->lastRecv = taosGetTimestampMs(); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); + + tsem_post(&(pRSmaStat->notEmpty)); + + int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); // smoothing consume - int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE; + int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; if (n > 1) { if (n > 10) { n = 10; } - taosMsleep(n << 4); - if (n > 2) { + taosMsleep(n << 3); + if (n > 5) { smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); - } else { - smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3); } } @@ -840,25 +845,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - taosRLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } if (!pRSmaInfo->taskInfo[0]) { if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -910,22 +915,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp static int32_t tdRSmaExecCheck(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize); - if (bufSize < RSMA_QTASKEXEC_BUFSIZE) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, - RSMA_QTASKEXEC_BUFSIZE); + if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { return TSDB_CODE_SUCCESS; } - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), - bufSize); - return TSDB_CODE_SUCCESS; - } - - smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); - SRSmaExecMsg fetchMsg; int32_t contLen = sizeof(SMsgHead); void *pBuf = rpcMallocCont(0 + contLen); @@ -949,7 +943,6 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } @@ -959,7 +952,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } - + STbUidStore uidStore = {0}; SRetention *pRetention = SMA_RETENTION(pSma); if (!RETENTION_VALID(pRetention + 1)) { // return directly if retention level 1 is invalid @@ -967,25 +960,35 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - STbUidStore uidStore = {0}; - tdFetchSubmitReqSuids(pMsg, &uidStore); + if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { + goto _err; + } if (uidStore.suid != 0) { - tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) { + goto _err; + } void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { + goto _err; + } pIter = taosHashIterate(uidStore.uidHash, pIter); } - tdUidStoreDestory(&uidStore); - - tdRSmaExecCheck(pSma); + if (tdRSmaExecCheck(pSma) < 0) { + goto _err; + } } } + tdUidStoreDestory(&uidStore); return TSDB_CODE_SUCCESS; +_err: + tdUidStoreDestory(&uidStore); + smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; } /** @@ -1521,8 +1524,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", - smaMgmt.rsetId, pRSmaInfo->refId); + smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pRSmaInfo->refId); return; } @@ -1553,7 +1556,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process - tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); + pItem->fetchLevel = pItem->level; + SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); + SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); + ASSERT(qItem->level == pItem->level); + ASSERT(qItem->fetchLevel == pItem->fetchLevel); + tsem_post(&(pStat->notEmpty)); + smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, + pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", @@ -1568,8 +1578,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } @@ -1578,135 +1588,21 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } -/** - * @brief put rsma fetch msg to fetch queue - * - * @param pSma - * @param pInfo - * @param level - * @return int32_t - */ -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; - int32_t ret = 0; - int32_t contLen = 0; - SEncoder encoder = {0}; - tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret); - if (ret < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - goto _err; - } - - void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead)); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen); - if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - } - tEncoderClear(&encoder); - - ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); - ((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead); - - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_VND_FETCH_RSMA, - .pCont = pBuf, - .contLen = contLen + sizeof(SMsgHead), - }; - - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { - smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", - SMA_VID(pSma), pInfo->suid, level, terrstr()); - goto _err; - } - - smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), - pInfo->suid, level); - - return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; -} - -/** - * @brief fetch rsma data of level 2/3 and submit - * - * @param pSma - * @param pMsg - * @return int32_t - */ -int32_t smaProcessFetch(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaFetchMsg req = {0}; - SDecoder decoder = {0}; - void *pBuf = NULL; - SRSmaStat *pRSmaStat = NULL; - if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { - terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - goto _err; - } - - pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); - - tDecoderInit(&decoder, pBuf, pRpcMsg->contLen); - if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _err; - } - - pRSmaStat = SMA_RSMA_STAT(pSma); - - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) { - SArray *pSubmitArr = NULL; - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pRSmaStat->execStat, 0); - goto _err; - } - tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr); - atomic_store_8(&pRSmaStat->execStat, 0); - taosArrayDestroy(pSubmitArr); - } else { - int8_t level = req.level; - int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid)); - if (val) { - level |= (*val); - } - ASSERT(level >= 1 && level <= 3); - taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level)); - } - - tDecoderClear(&decoder); - smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, - req.level); - return TSDB_CODE_SUCCESS; -_err: - tDecoderClear(&decoder); - smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); - return TSDB_CODE_FAILED; -} - static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { taosFreeQitem(*(void **)taosArrayGet(pItems, i)); } } -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) { - SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid); - if (!pInfo) { - return TSDB_CODE_SUCCESS; - } - +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { // step 1: consume submit req +#if 0 int64_t qMemSize = 0; if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize); + atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall)); taosArrayClear(pSubmitArr); @@ -1736,25 +1632,52 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SAr tdFreeRSmaSubmitItems(pSubmitArr); } } - - // step 2: fetch rsma result +#endif + // step 2: fetch rsma result(consider the efficiency and functionality) SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (level & i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); + if (pItem->fetchLevel) { + pItem->fetchLevel = 0; qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); if (!taskInfo) { continue; } + + int64_t curMs = taosGetTimestampMs(); + if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + pItem->nSkipped = 0; + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else { + if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + ++pItem->nSkipped; + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + continue; + } else { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } + } + + pItem->lastFetch = curMs; + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) { + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { tdCleanupStreamInputDataBlock(taskInfo); goto _err; } tdCleanupStreamInputDataBlock(taskInfo); + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 + " maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8, + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); } } @@ -1766,6 +1689,45 @@ _err: return TSDB_CODE_FAILED; } +static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { + taosArrayClear(pSubmitArr); + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr); + } + return TSDB_CODE_SUCCESS; +_err: + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + taosFreeQitem(msg); + } else { + break; + } + } + return TSDB_CODE_FAILED; +} + /** * @brief * @@ -1777,7 +1739,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; - SArray *pSubmitQArr = NULL; SArray *pSubmitArr = NULL; bool isFetchAll = false; @@ -1786,106 +1747,95 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - if (type == RSMA_EXEC_OVERFLOW) { - taosRLockLatch(SMA_ENV_LOCK(pEnv)); - if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return TSDB_CODE_SUCCESS; - } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - } - - if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { + if (!(pSubmitArr = + taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - // step 1: rsma exec - consume data in buffer queue for all suids - SRSmaExecQItem qItem = {0}; - void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock - if (type == RSMA_EXEC_OVERFLOW) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->queue)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - qItem.qall = &pInfo->qall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->queue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else if (type == RSMA_EXEC_COMMIT) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->iQueue)) { - taosReadAllQitems(pInfo->iQueue, pInfo->iQall); - qItem.qall = &pInfo->iQall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else { - ASSERT(0); - } - atomic_store_64(&pRSmaStat->qBufSize, 0); - - int32_t qSize = taosArrayGetSize(pSubmitQArr); - for (int32_t i = 0; i < qSize; ++i) { - SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i); - while (1) { - void *msg = NULL; - taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } + int32_t nIdle = 0; + while (true) { + if (++nIdle > 100) { + if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) { + // free the exec thread if without SubmitReq + break; } else { + // keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again + atomic_add_fetch_8(&pRSmaStat->nExecutor, 1); + nIdle = 0; + } + } + // step 1: rsma exec - consume data in buffer queue for all suids + if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { + void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + int64_t itemSize = 0; + if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || + RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->qall); + if (qallItemSize > 0) { + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } + + tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + if (qallItemSize > 0) { + // subtract the item size after the task finished, commit should wait for all items be consumed + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + nIdle = 0; + } + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } + } + pIter = taosHashIterate(infoHash, pIter); + } + if (type == RSMA_EXEC_COMMIT) { break; } } +#if 0 + else if (type == RSMA_EXEC_COMMIT) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->iQueue)) { + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->iQueue, pInfo->iQall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->iQall); + if (qallItemSize > 0) { + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + nIdle = 0; - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + // batch exec + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } + + // tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } } + ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); + pIter = taosHashIterate(infoHash, pIter); } - tdFreeRSmaSubmitItems(pSubmitArr); - taosArrayClear(pSubmitArr); + break; + } +#endif + else { + ASSERT(0); } - } - // step 2: rsma fetch - consume data in buffer queue for suids triggered by timer - if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) { - goto _end; - } - pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL); - if (pIter) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); - while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + tsem_wait(&pRSmaStat->notEmpty); } - } + } // end of while(true) _end: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_SUCCESS; _err: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_FAILED; } @@ -1905,15 +1855,21 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) { goto _err; } smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { - goto _err; + + int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1); + + if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { + goto _err; + } + } else { + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); } - atomic_store_8(&pRSmaStat->execStat, 0); smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), terrstr()); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c73d2ccfd5..7a8d168f4f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_EXEC_RSMA: return smaProcessExec(pVnode->pSma, pMsg); default: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d15dc99122..a432196b01 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3131,6 +3131,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { initResultRow(resultRow); pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; + // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId)); } if (offset != length) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 50beba8a9b..eb70002680 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { return num; } -void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } STaosQset *taosOpenQset() { STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1); From a9fcc12c338e9af562213e2a8fc54909a99a7e7d Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Fri, 19 Aug 2022 20:08:09 +0800 Subject: [PATCH 02/10] enh: rsma batch process --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 6 +++- source/dnode/vnode/src/inc/sma.h | 3 +- source/dnode/vnode/src/inc/vnodeInt.h | 3 ++ source/dnode/vnode/src/sma/smaOpen.c | 11 ++++++ source/dnode/vnode/src/sma/smaRollup.c | 36 +++++++++++-------- source/dnode/vnode/src/vnd/vnodeOpen.c | 3 ++ .../tsim/sma/rsmaPersistenceRecovery.sim | 6 ++++ 7 files changed, 52 insertions(+), 16 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 1f981cc9e0..4987aa42e1 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -87,10 +87,14 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (pVnode->refCount > 0) taosMsleep(10); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); + while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); + while (!taosQueueEmpty(pVnode->pQueryQ)) { + taosMsleep(10); + dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ)); + } while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 989d24295e..d32d67e29c 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,7 +32,8 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define RSMA_TASK_INFO_HASH_SLOT 8 +#define RSMA_TASK_INFO_HASH_SLOT (8) +#define RSMA_EXECUTOR_MAX (4) typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 63d228ec8b..3361b63fd8 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t smaInit(); void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); +int32_t smaPreClose(SSma* pSma); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); @@ -322,10 +323,12 @@ struct SVnode { TdThreadMutex lock; bool blocked; bool restored; + bool inClose; tsem_t syncSem; SQHandle* pQuery; }; + #define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define VND_TSDB(vnd) ((vnd)->pTsdb) diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 235fb1f941..7128ecb94a 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -146,6 +146,17 @@ int32_t smaClose(SSma *pSma) { return 0; } +int32_t smaPreClose(SSma *pSma) { + if (pSma && VND_IS_RSMA(pSma->pVnode)) { + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { + tsem_post(&(pRSmaStat->notEmpty)); + } + smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma)); + } + return 0; +} + /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e8018c0f33..049c2fef9d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,7 +19,6 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_EXECUTOR_MAX (4) // cnt #define RSMA_FETCH_DELAY_MAX (1800000) // ms #define RSMA_FETCH_SKIP_MAX (1000) // cnt #define RSMA_FETCH_ACTIVE_MAX (1800) // ms @@ -671,7 +670,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 1 +#if 0 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -1736,6 +1735,7 @@ _err: * @return int32_t */ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { + SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; @@ -1753,18 +1753,9 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - int32_t nIdle = 0; + bool isBusy = false; while (true) { - if (++nIdle > 100) { - if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) { - // free the exec thread if without SubmitReq - break; - } else { - // keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again - atomic_add_fetch_8(&pRSmaStat->nExecutor, 1); - nIdle = 0; - } - } + isBusy = false; // step 1: rsma exec - consume data in buffer queue for all suids if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock @@ -1785,7 +1776,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (qallItemSize > 0) { // subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); - nIdle = 0; + isBusy = true; } ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } @@ -1826,8 +1817,25 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ASSERT(0); } + smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma)); if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + if (pVnode->inClose) { + smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma)); + break; + } + smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma)); tsem_wait(&pRSmaStat->notEmpty); + smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma)); + if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { + smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + atomic_load_64(&pRSmaStat->nBufItems)); + break; + } else { + smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), + pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); + } + } else { + smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma)); } } // end of while(true) diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a4fd984fb7..dcfbd33b90 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); pVnode->blocked = false; + pVnode->inClose = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); @@ -181,6 +182,8 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + pVnode->inClose = true; + smaPreClose(pVnode->pSma); } } diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index f53cd45d48..faff48b61c 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -35,6 +35,7 @@ sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -51,6 +52,7 @@ endi print =============== select * from retention level 1 from memory sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start print =============== select * from retention level 2 from file sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -104,6 +107,7 @@ endi print =============== select * from retention level 1 from file sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -141,6 +145,7 @@ sleep 7000 print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file/mem rows $rows > 2 return -1 @@ -163,6 +168,7 @@ endi print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file/mem rows $rows > 2 return -1 From 50e7e033bc1d1017e6d88d112270518dc915e97a Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 20 Aug 2022 18:56:48 +0800 Subject: [PATCH 03/10] enh: rsma batch process --- source/dnode/vnode/src/sma/smaCommit.c | 21 +++++---- source/dnode/vnode/src/sma/smaRollup.c | 60 ++++++++++++-------------- source/dnode/vnode/src/sma/smaUtil.c | 3 ++ 3 files changed, 43 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 7b63056897..2c6e5f5ca1 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -351,7 +351,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { nLoops = 0; } } - smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { + return TSDB_CODE_FAILED; + } + smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall @@ -391,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { if (!pSmaEnv) { return TSDB_CODE_SUCCESS; } - +#if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); // perform persist task for qTaskInfo operator if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { return TSDB_CODE_FAILED; } +#endif return TSDB_CODE_SUCCESS; } @@ -421,8 +426,8 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // lock // taosWLockLatch(SMA_ENV_LOCK(pEnv)); - void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -440,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { SMA_VID(pSma), refVal, *pSuid); } - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); continue; } - +#if 0 if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->iTaskInfo[0]) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0]; - tdFreeRSmaInfo(pSma, pRSmaInfo, true); + tdFreeRSmaInfo(pSma, pRSmaInfo, false); pRSmaInfo->iTaskInfo[0] = NULL; } } else { @@ -456,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); - - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); +#endif } for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 049c2fef9d..6414f822c0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,9 +19,9 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (1800000) // ms -#define RSMA_FETCH_SKIP_MAX (1000) // cnt -#define RSMA_FETCH_ACTIVE_MAX (1800) // ms +#define RSMA_FETCH_DELAY_MAX (180000) // ms +#define RSMA_FETCH_SKIP_MAX (10) // cnt +#define RSMA_FETCH_ACTIVE_MAX (180) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -163,8 +163,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { pInfo->iQall = NULL; } - taosMemoryFree(pInfo); - } + taosMemoryFree(pInfo); + } return NULL; } @@ -968,13 +968,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { goto _err; } - void *pIter = taosHashIterate(uidStore.uidHash, NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { goto _err; } - pIter = taosHashIterate(uidStore.uidHash, pIter); } if (tdRSmaExecCheck(pSma) < 0) { @@ -1418,7 +1417,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { +#if 0 qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); +#endif + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1644,24 +1646,27 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm } int64_t curMs = taosGetTimestampMs(); - if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - pItem->nSkipped = 0; - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - } else { - if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - ++pItem->nSkipped; - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - continue; - } else { - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - } - } + // if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + // pItem->nSkipped = 0; + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + // SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + // } else { + // if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + // ++pItem->nSkipped; + // smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + // continue; + // } else { + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + // } + // } pItem->lastFetch = curMs; + // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } @@ -1817,25 +1822,16 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { ASSERT(0); } - smaInfo("prop:vgId:%d loop end check", SMA_VID(pSma)); if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { if (pVnode->inClose) { - smaInfo("prop:vgId:%d loop end check - inClose and break", SMA_VID(pSma)); break; } - smaInfo("prop:vgId:%d loop end check - wait for notEmpty", SMA_VID(pSma)); tsem_wait(&pRSmaStat->notEmpty); - smaInfo("prop:vgId:%d loop end check - received notEmpty", SMA_VID(pSma)); if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); break; - } else { - smaInfo("prop:vgId:%d loop end check - continue - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), - pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); } - } else { - smaInfo("prop:vgId:%d loop end check - continue to run", SMA_VID(pSma)); } } // end of while(true) diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index da70222485..d771797963 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (!pInfo->iTaskInfo[i]) { + continue; + } if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { goto _err; } From f9e0aad82761db775bc0c1774be58a16fea15e99 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 20 Aug 2022 23:28:48 +0800 Subject: [PATCH 04/10] enh: rsma batch process --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 5 +---- source/dnode/vnode/src/sma/smaCommit.c | 4 ++-- source/dnode/vnode/src/sma/smaOpen.c | 1 - source/dnode/vnode/src/sma/smaRollup.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeCommit.c | 27 ++++++++++++++++-------- 5 files changed, 23 insertions(+), 18 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 4987aa42e1..a309aaf1ca 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -91,10 +91,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); - while (!taosQueueEmpty(pVnode->pQueryQ)) { - taosMsleep(10); - dInfo("prop:vgId:%d, query queue size is %d", pVnode->vgId, taosQueueItemSize(pVnode->pQueryQ)); - } + while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10); dTrace("vgId:%d, vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 2c6e5f5ca1..ac9afdb11f 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -316,6 +316,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait all triggered fetch tasks finished int32_t nLoops = 0; @@ -378,8 +380,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // unlock // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif - // step 5: others - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 7128ecb94a..b2344bc0ec 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -152,7 +152,6 @@ int32_t smaPreClose(SSma *pSma) { for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { tsem_post(&(pRSmaStat->notEmpty)); } - smaInfo("prop:vgId:%d post notEmtpy", SMA_VID(pSma)); } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 6414f822c0..5ef5bd0aea 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1769,7 +1769,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { int64_t itemSize = 0; if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { - smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock int32_t qallItemSize = taosQallItemSize(pInfo->qall); @@ -1828,7 +1828,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } tsem_wait(&pRSmaStat->notEmpty); if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("prop:vgId:%d loop end check - break - inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, atomic_load_64(&pRSmaStat->nBufItems)); break; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 64f223b974..8c73499229 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - smaAsyncPreCommit(pVnode->pSma); - - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - pVnode->state.commitTerm = pVnode->state.applyTerm; // save info @@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + // preCommit + // smaSyncPreCommit(pVnode->pSma); + if(smaAsyncPreCommit(pVnode->pSma) < 0){ + ASSERT(0); + return -1; + } + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; + // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); @@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - smaAsyncCommit(pVnode->pSma); + if (smaAsyncCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); @@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) { // postCommit // smaSyncPostCommit(pVnode->pSma); - smaAsyncPostCommit(pVnode->pSma); + if (smaAsyncPostCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } // apply the commit (TODO) walEndSnapshot(pVnode->pWal); From 0de7f3e3a7b29ddd2f2fc08a74481b9074eb3ca5 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sat, 20 Aug 2022 23:46:12 +0800 Subject: [PATCH 05/10] enh: rsma code optimization --- source/dnode/vnode/src/inc/sma.h | 1 - source/dnode/vnode/src/sma/smaRollup.c | 39 +++++++++++--------------- 2 files changed, 16 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index d32d67e29c..5ba22f6db8 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -123,7 +123,6 @@ struct SRSmaInfoItem { uint8_t nSkipped; // number of skipped to fetch data from all active window int8_t fetchLevel; int32_t maxDelay; // ms - int64_t lastFetch; // ms tmr_h tmrId; }; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 5ef5bd0aea..e2d7a588b0 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,9 +19,8 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (180000) // ms -#define RSMA_FETCH_SKIP_MAX (10) // cnt -#define RSMA_FETCH_ACTIVE_MAX (180) // ms +#define RSMA_FETCH_DELAY_MAX (900000) // ms +#define RSMA_FETCH_ACTIVE_MAX (1800) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -1646,26 +1645,20 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm } int64_t curMs = taosGetTimestampMs(); - // if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - // pItem->nSkipped = 0; - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - // SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - // } else { - // if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - // ++pItem->nSkipped; - // smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - // continue; - // } else { - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - // } - // } + if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + ++pItem->nSkipped; + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + continue; + } else { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } - pItem->lastFetch = curMs; - - // smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - // SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + pItem->nSkipped = 0; if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; @@ -1680,7 +1673,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); } else { smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 - " maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8, + " maxDelay:%d, fetch not executed as fetch level is %" PRIi8, SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); } } From 8b3b6a8962384f18e355988368a7806baa4b9462 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 01:07:21 +0800 Subject: [PATCH 06/10] enh: code optimization and memory leak --- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 1 - source/dnode/vnode/src/inc/sma.h | 14 +++++++------- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/sma/smaCommit.c | 14 +++++++------- source/dnode/vnode/src/sma/smaEnv.c | 6 +++--- source/dnode/vnode/src/sma/smaOpen.c | 8 ++++++-- source/dnode/vnode/src/sma/smaRollup.c | 4 +++- source/libs/tdb/src/db/tdbBtree.c | 3 +++ 8 files changed, 29 insertions(+), 22 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index a309aaf1ca..1f981cc9e0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -87,7 +87,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { while (pVnode->refCount > 0) taosMsleep(10); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); - while (!taosQueueEmpty(pVnode->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pSyncQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pApplyQ)) taosMsleep(10); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 5ba22f6db8..50beb0054d 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -118,12 +118,12 @@ struct SSmaStat { #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; - int8_t triggerStat; - uint8_t nSkipped; // number of skipped to fetch data from all active window - int8_t fetchLevel; - int32_t maxDelay; // ms - tmr_h tmrId; + int8_t level : 4; + int8_t fetchLevel : 4; + int8_t triggerStat; + uint16_t nSkipped; // number of skipped to fetch data from all active window + int32_t maxDelay; // ms + tmr_h tmrId; }; struct SRSmaInfo { @@ -131,8 +131,8 @@ struct SRSmaInfo { int64_t suid; int64_t refId; // refId of SRSmaStat int64_t lastRecv; // ms - int8_t delFlag; int8_t assigned; // 0 idle, 1 assgined for exec + int8_t delFlag; int16_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 3361b63fd8..8c0ea84bee 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -328,7 +328,6 @@ struct SVnode { SQHandle* pQuery; }; - #define TD_VID(PVNODE) ((PVNODE)->config.vgId) #define VND_TSDB(vnd) ((vnd)->pTsdb) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index ac9afdb11f..ca5367f397 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -109,7 +109,7 @@ int32_t smaBegin(SSma *pSma) { /** * @brief pre-commit for rollup sma(sync commit). * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. - * 2) wait all triggered fetch tasks finished + * 2) wait for all triggered fetch tasks to finish * 3) perform persist task for qTaskInfo * * @param pSma @@ -127,14 +127,14 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - // step 2: wait all triggered fetch tasks finished + // step 2: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -319,14 +319,14 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; ASSERT(pRSmaStat->commitAppliedVer > 0); - // step 2: wait all triggered fetch tasks finished + // step 2: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma commit, fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma commit, fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index d39efc748e..e3b83f9955 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -275,14 +275,14 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: wait all triggered fetch tasks finished + // step 3: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index b2344bc0ec..e2710b26e3 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -148,9 +148,13 @@ int32_t smaClose(SSma *pSma) { int32_t smaPreClose(SSma *pSma) { if (pSma && VND_IS_RSMA(pSma->pVnode)) { - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); + SSmaEnv *pEnv = NULL; + SRSmaStat *pStat = NULL; + if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) { + return 0; + } for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { - tsem_post(&(pRSmaStat->notEmpty)); + tsem_post(&(pStat->notEmpty)); } } return 0; diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index e2d7a588b0..72f779eaab 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1524,7 +1524,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, pRSmaInfo->refId); return; } @@ -1557,10 +1557,12 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process pItem->fetchLevel = pItem->level; +#if 0 SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); ASSERT(qItem->level == pItem->level); ASSERT(qItem->fetchLevel == pItem->fetchLevel); +#endif tsem_post(&(pStat->notEmpty)); smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, pRSmaInfo->suid); diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 7a44edb12c..559b79fd39 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -782,6 +782,9 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx pBt); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbOsFree(pNewCell); + if (TDB_CELLDECODER_FREE_VAL(&cd)) { + tdbFree(cd.pVal); + } } // move to next new page From 77bfd400cbd8f3464de5452491f699d93780f3c2 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 01:10:18 +0800 Subject: [PATCH 07/10] enh: code optimization --- source/dnode/vnode/src/inc/sma.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 50beb0054d..b36c8eeac3 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -121,7 +121,7 @@ struct SRSmaInfoItem { int8_t level : 4; int8_t fetchLevel : 4; int8_t triggerStat; - uint16_t nSkipped; // number of skipped to fetch data from all active window + uint16_t nSkipped; int32_t maxDelay; // ms tmr_h tmrId; }; From 4b3d98916855c13d72b73c148ade995600efed26 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 01:19:32 +0800 Subject: [PATCH 08/10] other: naming optimization --- source/dnode/vnode/src/sma/smaRollup.c | 67 +++++++------------------- 1 file changed, 18 insertions(+), 49 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 72f779eaab..959a33d8ce 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -42,9 +42,9 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid); +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -636,8 +636,8 @@ _end: return code; } -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid) { +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -815,7 +815,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); return TSDB_CODE_SUCCESS; } @@ -1596,46 +1596,15 @@ static void tdFreeRSmaSubmitItems(SArray *pItems) { } } -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { - // step 1: consume submit req -#if 0 - int64_t qMemSize = 0; - if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall)); - - taosArrayClear(pSubmitArr); - - while (1) { - void *msg = NULL; - taosGetQitem(pInfo->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } else { - break; - } - } - - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) < - 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } - - tdFreeRSmaSubmitItems(pSubmitArr); - } - } -#endif - // step 2: fetch rsma result(consider the efficiency and functionality) +/** + * @brief fetch rsma result(consider the efficiency and functionality) + * + * @param pSma + * @param pInfo + * @param pSubmitArr + * @return int32_t + */ +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); @@ -1665,7 +1634,7 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubm if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { tdCleanupStreamInputDataBlock(taskInfo); goto _err; } @@ -1772,7 +1741,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); } - tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); if (qallItemSize > 0) { // subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); @@ -1803,7 +1772,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); } - // tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + // tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); } } From 65e1cce6dfbc3b94bc18c757775c5c1dad0192d9 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 01:22:56 +0800 Subject: [PATCH 09/10] enh: rsma code optimization --- source/dnode/vnode/src/sma/smaRollup.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 959a33d8ce..448b8ab508 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -162,8 +162,8 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { pInfo->iQall = NULL; } - taosMemoryFree(pInfo); - } + taosMemoryFree(pInfo); + } return NULL; } @@ -1524,8 +1524,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pRSmaInfo->refId); + smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", + smaMgmt.rsetId, pRSmaInfo->refId); return; } @@ -1741,7 +1741,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); } - tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + if (type == RSMA_EXEC_OVERFLOW) { + tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + } + if (qallItemSize > 0) { // subtract the item size after the task finished, commit should wait for all items be consumed atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); From e0317964f096b69ddcea5a1bc19f31165ff5e842 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 21 Aug 2022 14:56:20 +0800 Subject: [PATCH 10/10] fix: adjust max executor for rsma --- source/dnode/vnode/src/inc/sma.h | 2 +- source/libs/tdb/src/db/tdbBtree.c | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index b36c8eeac3..ca77042bb2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -33,7 +33,7 @@ extern "C" { // clang-format on #define RSMA_TASK_INFO_HASH_SLOT (8) -#define RSMA_EXECUTOR_MAX (4) +#define RSMA_EXECUTOR_MAX (1) typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; diff --git a/source/libs/tdb/src/db/tdbBtree.c b/source/libs/tdb/src/db/tdbBtree.c index 559b79fd39..7a44edb12c 100644 --- a/source/libs/tdb/src/db/tdbBtree.c +++ b/source/libs/tdb/src/db/tdbBtree.c @@ -782,9 +782,6 @@ static int tdbBtreeBalanceNonRoot(SBTree *pBt, SPage *pParent, int idx, TXN *pTx pBt); tdbPageInsertCell(pParent, sIdx++, pNewCell, szNewCell, 0); tdbOsFree(pNewCell); - if (TDB_CELLDECODER_FREE_VAL(&cd)) { - tdbFree(cd.pVal); - } } // move to next new page