diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f870bd161f..8f199c72f7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2667,31 +2667,6 @@ typedef struct { int32_t padding; } SRSmaExecMsg; -typedef struct { - int64_t suid; - int8_t level; -} SRSmaFetchMsg; - -static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { - if (tStartEncode(pCoder) < 0) return -1; - - if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; - if (tEncodeI8(pCoder, pReq->level) < 0) return -1; - - tEndEncode(pCoder); - return 0; -} - -static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; - - tEndDecode(pCoder); - return 0; -} - typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 16d5965759..e2bb3e2ae1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -201,7 +201,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) - TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 0f4f1db9ee..da409a90bb 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall); int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); int32_t taosGetQitem(STaosQall *qall, void **ppItem); void taosResetQitems(STaosQall *qall); +int32_t taosQallItemSize(STaosQall *qall); STaosQset *taosOpenQset(); void taosCloseQset(STaosQset *qset); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e0f2b83160..ebec3d5ea6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt if (req.rollup) { req.rsmaParam.maxdelay[0] = pStb->maxdelay[0]; req.rsmaParam.maxdelay[1] = pStb->maxdelay[1]; + req.rsmaParam.watermark[0] = pStb->watermark[0]; + req.rsmaParam.watermark[1] = pStb->watermark[1]; if (pStb->ast1Len > 0) { if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c43772062e..ca77042bb2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -32,7 +32,8 @@ extern "C" { #define smaTrace(...) do { if (smaDebugFlag & DEBUG_TRACE) { taosPrintLog("SMA ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) // clang-format on -#define RSMA_TASK_INFO_HASH_SLOT 8 +#define RSMA_TASK_INFO_HASH_SLOT (8) +#define RSMA_EXECUTOR_MAX (1) typedef struct SSmaEnv SSmaEnv; typedef struct SSmaStat SSmaStat; @@ -90,14 +91,14 @@ struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit int64_t refId; // shared by fetch tasks - volatile int64_t qBufSize; // queue buffer size + volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int8_t nExecutor; // [1, max(half of query threads, 4)] int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing - int8_t execStat; // 0 not in exec , 1 in exec SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SHashObj *infoHash; // key: suid, value: SRSmaInfo - SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2 + tsem_t notEmpty; // has items in queue buffer }; struct SSmaStat { @@ -111,26 +112,28 @@ struct SSmaStat { #define SMA_STAT_TSMA(s) (&(s)->tsmaStat) #define SMA_STAT_RSMA(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->infoHash) -#define RSMA_FETCH_HASH(r) ((r)->fetchHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; + int8_t level : 4; + int8_t fetchLevel : 4; int8_t triggerStat; - uint16_t interval; // second - int32_t maxDelay; + uint16_t nSkipped; + int32_t maxDelay; // ms tmr_h tmrId; }; struct SRSmaInfo { STSchema *pTSchema; int64_t suid; - int64_t refId; // refId of SRSmaStat - uint64_t delFlag : 1; - uint64_t lastReceived : 63; // second + int64_t refId; // refId of SRSmaStat + int64_t lastRecv; // ms + int8_t assigned; // 0 idle, 1 assgined for exec + int8_t delFlag; + int16_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8bc82928ed..39c5f3873e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -189,6 +189,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem int32_t smaInit(); void smaCleanUp(); int32_t smaOpen(SVnode* pVnode); +int32_t smaPreClose(SSma* pSma); int32_t smaClose(SSma* pSma); int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); @@ -198,7 +199,6 @@ int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); -int32_t smaProcessFetch(SSma* pSma, void* pMsg); int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); @@ -323,6 +323,7 @@ struct SVnode { TdThreadMutex lock; bool blocked; bool restored; + bool inClose; tsem_t syncSem; SQHandle* pQuery; }; diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 8b92475035..ca5367f397 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -109,7 +109,7 @@ int32_t smaBegin(SSma *pSma) { /** * @brief pre-commit for rollup sma(sync commit). * 1) set trigger stat of rsma timer TASK_TRIGGER_STAT_PAUSED. - * 2) wait all triggered fetch tasks finished + * 2) wait for all triggered fetch tasks to finish * 3) perform persist task for qTaskInfo * * @param pSma @@ -127,14 +127,14 @@ static int32_t tdProcessRSmaSyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat paused atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); - // step 2: wait all triggered fetch tasks finished + // step 2: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -316,15 +316,17 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 1); + pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + ASSERT(pRSmaStat->commitAppliedVer > 0); - // step 2: wait all triggered fetch tasks finished + // step 2: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -338,30 +340,29 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - nLoops = 0; - smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1); - if (old == 0) break; - if (++nLoops > 1000) { - sched_yield(); - nLoops = 0; - smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - } - } - - smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) { - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } + smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + nLoops = 0; + while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { + return TSDB_CODE_FAILED; + } + smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + +#if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); @@ -376,13 +377,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } - atomic_store_64(&pRSmaStat->qBufSize, 0); - atomic_store_8(&pRSmaStat->execStat, 0); // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - - // step 5: others - pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); +#endif return TSDB_CODE_SUCCESS; } @@ -398,13 +395,14 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { if (!pSmaEnv) { return TSDB_CODE_SUCCESS; } - +#if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); // perform persist task for qTaskInfo operator if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { return TSDB_CODE_FAILED; } +#endif return TSDB_CODE_SUCCESS; } @@ -426,10 +424,10 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge qTaskInfo and iQTaskInfo // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); - void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); - while (pIter) { + void *pIter = NULL; + while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; if (RSMA_INFO_IS_DEL(pRSmaInfo)) { @@ -447,14 +445,13 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { SMA_VID(pSma), refVal, *pSuid); } - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); continue; } - +#if 0 if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->iTaskInfo[0]) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pRSmaInfo->iTaskInfo[0]; - tdFreeRSmaInfo(pSma, pRSmaInfo, true); + tdFreeRSmaInfo(pSma, pRSmaInfo, false); pRSmaInfo->iTaskInfo[0] = NULL; } } else { @@ -463,8 +460,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); - - pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); +#endif } for (int32_t i = 0; i < taosArrayGetSize(rsmaDeleted); ++i) { @@ -480,10 +476,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); } taosArrayDestroy(rsmaDeleted); - // TODO: remove suid in files? // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // step 2: cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index f51aad22bd..e3b83f9955 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); + tsem_init(&pRSmaStat->notEmpty, 0, 0); // init smaMgmt smaInit(); @@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (!RSMA_INFO_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } - - RSMA_FETCH_HASH(pRSmaStat) = taosHashInit( - RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if (!RSMA_FETCH_HASH(pRSmaStat)) { - return TSDB_CODE_FAILED; - } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { @@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat); // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); + tsem_destroy(&(pStat->notEmpty)); // step 2: destroy the rsma info and associated fetch tasks if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { @@ -279,17 +275,14 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: destroy the rsma fetch hash - taosHashCleanup(RSMA_FETCH_HASH(pStat)); - - // step 4: wait all triggered fetch tasks finished + // step 3: wait for all triggered fetch tasks to finish int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 235fb1f941..e2710b26e3 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -146,6 +146,20 @@ int32_t smaClose(SSma *pSma) { return 0; } +int32_t smaPreClose(SSma *pSma) { + if (pSma && VND_IS_RSMA(pSma->pVnode)) { + SSmaEnv *pEnv = NULL; + SRSmaStat *pStat = NULL; + if (!(pEnv = SMA_RSMA_ENV(pSma)) || !(pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv))) { + return 0; + } + for (int32_t i = 0; i < RSMA_EXECUTOR_MAX; ++i) { + tsem_post(&(pStat->notEmpty)); + } + } + return 0; +} + /** * @brief rsma env restore * diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9b3b0cb63d..448b8ab508 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,10 +15,12 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE (32768) -#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZE (1048576) -#define RSMA_SUBMIT_BATCH_SIZE (1024) +#define RSMA_QTASKINFO_BUFSIZE (32768) // size +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt +#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt +#define RSMA_FETCH_DELAY_MAX (900000) // ms +#define RSMA_FETCH_ACTIVE_MAX (1800) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -40,11 +42,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr); -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid); +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); @@ -635,8 +636,8 @@ _end: return code; } -static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, - int64_t suid) { +static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, + int64_t suid) { SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -729,22 +730,24 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu taosWriteQitem(pInfo->queue, qItem); + pInfo->lastRecv = taosGetTimestampMs(); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); + + tsem_post(&(pRSmaStat->notEmpty)); + + int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); // smoothing consume - int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE; + int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; if (n > 1) { if (n > 10) { n = 10; } - taosMsleep(n << 4); - if (n > 2) { + taosMsleep(n << 3); + if (n > 5) { smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); - } else { - smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3); } } @@ -812,7 +815,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, } SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, idx); - tdRSmaFetchAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); + tdRSmaExecAndSubmitResult(pSma, qTaskInfo, pItem, pInfo->pTSchema, pInfo->suid); return TSDB_CODE_SUCCESS; } @@ -840,25 +843,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - taosRLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } if (!pRSmaInfo->taskInfo[0]) { if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -910,22 +913,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp static int32_t tdRSmaExecCheck(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize); - if (bufSize < RSMA_QTASKEXEC_BUFSIZE) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, - RSMA_QTASKEXEC_BUFSIZE); + if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { return TSDB_CODE_SUCCESS; } - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), - bufSize); - return TSDB_CODE_SUCCESS; - } - - smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); - SRSmaExecMsg fetchMsg; int32_t contLen = sizeof(SMsgHead); void *pBuf = rpcMallocCont(0 + contLen); @@ -949,7 +941,6 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } @@ -959,7 +950,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } - + STbUidStore uidStore = {0}; SRetention *pRetention = SMA_RETENTION(pSma); if (!RETENTION_VALID(pRetention + 1)) { // return directly if retention level 1 is invalid @@ -967,25 +958,34 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - STbUidStore uidStore = {0}; - tdFetchSubmitReqSuids(pMsg, &uidStore); + if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { + goto _err; + } if (uidStore.suid != 0) { - tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid); - - void *pIter = taosHashIterate(uidStore.uidHash, NULL); - while (pIter) { - tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid); - pIter = taosHashIterate(uidStore.uidHash, pIter); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) { + goto _err; } - tdUidStoreDestory(&uidStore); + void *pIter = NULL; + while ((pIter = taosHashIterate(uidStore.uidHash, pIter))) { + tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { + goto _err; + } + } - tdRSmaExecCheck(pSma); + if (tdRSmaExecCheck(pSma) < 0) { + goto _err; + } } } + tdUidStoreDestory(&uidStore); return TSDB_CODE_SUCCESS; +_err: + tdUidStoreDestory(&uidStore); + smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; } /** @@ -1416,7 +1416,10 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { } for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { +#if 0 qTaskInfo_t taskInfo = RSMA_INFO_IQTASK(pRSmaInfo, i); +#endif + qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); if (!taskInfo) { smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d qTaskInfo is NULL", vid, pRSmaInfo->suid, i + 1); continue; @@ -1553,7 +1556,16 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process - tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); + pItem->fetchLevel = pItem->level; +#if 0 + SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); + SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); + ASSERT(qItem->level == pItem->level); + ASSERT(qItem->fetchLevel == pItem->fetchLevel); +#endif + tsem_post(&(pStat->notEmpty)); + smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, + pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", @@ -1568,8 +1580,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } @@ -1578,183 +1590,62 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } -/** - * @brief put rsma fetch msg to fetch queue - * - * @param pSma - * @param pInfo - * @param level - * @return int32_t - */ -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; - int32_t ret = 0; - int32_t contLen = 0; - SEncoder encoder = {0}; - tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret); - if (ret < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - goto _err; - } - - void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead)); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen); - if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - } - tEncoderClear(&encoder); - - ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); - ((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead); - - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_VND_FETCH_RSMA, - .pCont = pBuf, - .contLen = contLen + sizeof(SMsgHead), - }; - - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { - smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", - SMA_VID(pSma), pInfo->suid, level, terrstr()); - goto _err; - } - - smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), - pInfo->suid, level); - - return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; -} - -/** - * @brief fetch rsma data of level 2/3 and submit - * - * @param pSma - * @param pMsg - * @return int32_t - */ -int32_t smaProcessFetch(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaFetchMsg req = {0}; - SDecoder decoder = {0}; - void *pBuf = NULL; - SRSmaStat *pRSmaStat = NULL; - if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { - terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - goto _err; - } - - pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); - - tDecoderInit(&decoder, pBuf, pRpcMsg->contLen); - if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _err; - } - - pRSmaStat = SMA_RSMA_STAT(pSma); - - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) { - SArray *pSubmitArr = NULL; - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pRSmaStat->execStat, 0); - goto _err; - } - tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr); - atomic_store_8(&pRSmaStat->execStat, 0); - taosArrayDestroy(pSubmitArr); - } else { - int8_t level = req.level; - int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid)); - if (val) { - level |= (*val); - } - ASSERT(level >= 1 && level <= 3); - taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level)); - } - - tDecoderClear(&decoder); - smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, - req.level); - return TSDB_CODE_SUCCESS; -_err: - tDecoderClear(&decoder); - smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); - return TSDB_CODE_FAILED; -} - static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { taosFreeQitem(*(void **)taosArrayGet(pItems, i)); } } -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) { - SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid); - if (!pInfo) { - return TSDB_CODE_SUCCESS; - } - - // step 1: consume submit req - int64_t qMemSize = 0; - if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - - SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize); - - taosArrayClear(pSubmitArr); - - while (1) { - void *msg = NULL; - taosGetQitem(pInfo->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } else { - break; - } - } - - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, RSMA_EXEC_TIMEOUT, i) < - 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } - } - - tdFreeRSmaSubmitItems(pSubmitArr); - } - } - - // step 2: fetch rsma result +/** + * @brief fetch rsma result(consider the efficiency and functionality) + * + * @param pSma + * @param pInfo + * @param pSubmitArr + * @return int32_t + */ +static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (level & i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); + if (pItem->fetchLevel) { + pItem->fetchLevel = 0; qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); if (!taskInfo) { continue; } + + int64_t curMs = taosGetTimestampMs(); + if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + ++pItem->nSkipped; + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + continue; + } else { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } + + pItem->nSkipped = 0; + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) { + if (tdRSmaExecAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { tdCleanupStreamInputDataBlock(taskInfo); goto _err; } tdCleanupStreamInputDataBlock(taskInfo); + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 + " maxDelay:%d, fetch not executed as fetch level is %" PRIi8, + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); } } @@ -1766,6 +1657,45 @@ _err: return TSDB_CODE_FAILED; } +static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { + taosArrayClear(pSubmitArr); + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr); + } + return TSDB_CODE_SUCCESS; +_err: + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + taosFreeQitem(msg); + } else { + break; + } + } + return TSDB_CODE_FAILED; +} + /** * @brief * @@ -1774,10 +1704,10 @@ _err: * @return int32_t */ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { + SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; - SArray *pSubmitQArr = NULL; SArray *pSubmitArr = NULL; bool isFetchAll = false; @@ -1786,106 +1716,97 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - if (type == RSMA_EXEC_OVERFLOW) { - taosRLockLatch(SMA_ENV_LOCK(pEnv)); - if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return TSDB_CODE_SUCCESS; - } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - } - - if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { + if (!(pSubmitArr = + taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + bool isBusy = false; + while (true) { + isBusy = false; + // step 1: rsma exec - consume data in buffer queue for all suids + if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { + void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + int64_t itemSize = 0; + if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || + RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + smaDebug("vgId:%d, queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->qall); + if (qallItemSize > 0) { + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } - // step 1: rsma exec - consume data in buffer queue for all suids - SRSmaExecQItem qItem = {0}; - void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock - if (type == RSMA_EXEC_OVERFLOW) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->queue)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - qItem.qall = &pInfo->qall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->queue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else if (type == RSMA_EXEC_COMMIT) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->iQueue)) { - taosReadAllQitems(pInfo->iQueue, pInfo->iQall); - qItem.qall = &pInfo->iQall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else { - ASSERT(0); - } - atomic_store_64(&pRSmaStat->qBufSize, 0); + if (type == RSMA_EXEC_OVERFLOW) { + tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + } - int32_t qSize = taosArrayGetSize(pSubmitQArr); - for (int32_t i = 0; i < qSize; ++i) { - SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i); - while (1) { - void *msg = NULL; - taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + if (qallItemSize > 0) { + // subtract the item size after the task finished, commit should wait for all items be consumed + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + isBusy = true; + } + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } } - } else { + pIter = taosHashIterate(infoHash, pIter); + } + if (type == RSMA_EXEC_COMMIT) { break; } } +#if 0 + else if (type == RSMA_EXEC_COMMIT) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->iQueue)) { + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->iQueue, pInfo->iQall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->iQall); + if (qallItemSize > 0) { + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + nIdle = 0; - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + // batch exec + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } + + // tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } } + ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); + pIter = taosHashIterate(infoHash, pIter); } - tdFreeRSmaSubmitItems(pSubmitArr); - taosArrayClear(pSubmitArr); + break; + } +#endif + else { + ASSERT(0); } - } - // step 2: rsma fetch - consume data in buffer queue for suids triggered by timer - if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) { - goto _end; - } - pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL); - if (pIter) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); - while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + if (pVnode->inClose) { + break; + } + tsem_wait(&pRSmaStat->notEmpty); + if (pVnode->inClose && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { + smaInfo("vgId:%d, exec task end, inClose:%d, nBufItems:%" PRIi64, SMA_VID(pSma), pVnode->inClose, + atomic_load_64(&pRSmaStat->nBufItems)); + break; + } } - } + } // end of while(true) _end: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_SUCCESS; _err: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_FAILED; } @@ -1905,15 +1826,21 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) { goto _err; } smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { - goto _err; + + int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1); + + if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { + goto _err; + } + } else { + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); } - atomic_store_8(&pRSmaStat->execStat, 0); smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), terrstr()); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index da70222485..d771797963 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -375,6 +375,9 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (!pInfo->iTaskInfo[i]) { + continue; + } if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { goto _err; } diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 64f223b974..8c73499229 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -220,13 +220,6 @@ int vnodeCommit(SVnode *pVnode) { vInfo("vgId:%d, start to commit, commit ID:%" PRId64 " version:%" PRId64, TD_VID(pVnode), pVnode->state.commitID, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - smaAsyncPreCommit(pVnode->pSma); - - vnodeBufPoolUnRef(pVnode->inUse); - pVnode->inUse = NULL; - pVnode->state.commitTerm = pVnode->state.applyTerm; // save info @@ -241,6 +234,16 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); + // preCommit + // smaSyncPreCommit(pVnode->pSma); + if(smaAsyncPreCommit(pVnode->pSma) < 0){ + ASSERT(0); + return -1; + } + + vnodeBufPoolUnRef(pVnode->inUse); + pVnode->inUse = NULL; + // commit each sub-system if (metaCommit(pVnode->pMeta) < 0) { ASSERT(0); @@ -248,7 +251,10 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - smaAsyncCommit(pVnode->pSma); + if (smaAsyncCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { ASSERT(0); @@ -285,7 +291,10 @@ int vnodeCommit(SVnode *pVnode) { // postCommit // smaSyncPostCommit(pVnode->pSma); - smaAsyncPostCommit(pVnode->pSma); + if (smaAsyncPostCommit(pVnode->pSma) < 0) { + ASSERT(0); + return -1; + } // apply the commit (TODO) walEndSnapshot(pVnode->pWal); diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index a4fd984fb7..dcfbd33b90 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -87,6 +87,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) { pVnode->msgCb = msgCb; taosThreadMutexInit(&pVnode->lock, NULL); pVnode->blocked = false; + pVnode->inClose = false; tsem_init(&pVnode->syncSem, 0, 0); tsem_init(&(pVnode->canCommit), 0, 1); @@ -181,6 +182,8 @@ _err: void vnodePreClose(SVnode *pVnode) { if (pVnode) { syncLeaderTransfer(pVnode->sync); + pVnode->inClose = true; + smaPreClose(pVnode->pSma); } } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c73d2ccfd5..7a8d168f4f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_EXEC_RSMA: return smaProcessExec(pVnode->pSma, pMsg); default: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9366f014c1..f7fb6cd405 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3137,6 +3137,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { initResultRow(resultRow); pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; + // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId)); } if (offset != length) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 50beba8a9b..eb70002680 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { return num; } -void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } STaosQset *taosOpenQset() { STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1); diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index f53cd45d48..faff48b61c 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -35,6 +35,7 @@ sleep 7000 print =============== select * from retention level 2 from memory sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -51,6 +52,7 @@ endi print =============== select * from retention level 1 from memory sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -89,6 +91,7 @@ system sh/exec.sh -n dnode1 -s start print =============== select * from retention level 2 from file sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file rows $rows > 2 return -1 @@ -104,6 +107,7 @@ endi print =============== select * from retention level 1 from file sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file rows $rows > 2 return -1 @@ -141,6 +145,7 @@ sleep 7000 print =============== select * from retention level 2 from file and memory after rsma qtaskinfo recovery sql select * from ct1; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 2 file/mem rows $rows > 2 return -1 @@ -163,6 +168,7 @@ endi print =============== select * from retention level 1 from file and memory after rsma qtaskinfo recovery sql select * from ct1 where ts > now-8d; print $data00 $data01 $data02 +print $data10 $data11 $data12 if $rows > 2 then print retention level 1 file/mem rows $rows > 2 return -1