diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f870bd161f..8f199c72f7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2667,31 +2667,6 @@ typedef struct { int32_t padding; } SRSmaExecMsg; -typedef struct { - int64_t suid; - int8_t level; -} SRSmaFetchMsg; - -static FORCE_INLINE int32_t tEncodeSRSmaFetchMsg(SEncoder* pCoder, const SRSmaFetchMsg* pReq) { - if (tStartEncode(pCoder) < 0) return -1; - - if (tEncodeI64(pCoder, pReq->suid) < 0) return -1; - if (tEncodeI8(pCoder, pReq->level) < 0) return -1; - - tEndEncode(pCoder); - return 0; -} - -static FORCE_INLINE int32_t tDecodeSRSmaFetchMsg(SDecoder* pCoder, SRSmaFetchMsg* pReq) { - if (tStartDecode(pCoder) < 0) return -1; - - if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1; - if (tDecodeI8(pCoder, &pReq->level) < 0) return -1; - - tEndDecode(pCoder); - return 0; -} - typedef struct { int8_t version; // for compatibility(default 0) int8_t intervalUnit; // MACRO: TIME_UNIT_XXX diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 16d5965759..e2bb3e2ae1 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -201,7 +201,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_SMA, "vnode-drop-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBMIT_RSMA, "vnode-submit-rsma", SSubmitReq, SSubmitRsp) - TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", SRSmaFetchMsg, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_FETCH_RSMA, "vnode-fetch-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_EXEC_RSMA, "vnode-exec-rsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "delete-data", SVDeleteReq, SVDeleteRsp) TD_DEF_MSG_TYPE(TDMT_VND_BATCH_DEL, "batch-delete", SBatchDeleteReq, NULL) diff --git a/include/util/tqueue.h b/include/util/tqueue.h index 0f4f1db9ee..da409a90bb 100644 --- a/include/util/tqueue.h +++ b/include/util/tqueue.h @@ -76,6 +76,7 @@ void taosFreeQall(STaosQall *qall); int32_t taosReadAllQitems(STaosQueue *queue, STaosQall *qall); int32_t taosGetQitem(STaosQall *qall, void **ppItem); void taosResetQitems(STaosQall *qall); +int32_t taosQallItemSize(STaosQall *qall); STaosQset *taosOpenQset(); void taosCloseQset(STaosQset *qset); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index e0f2b83160..ebec3d5ea6 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -442,6 +442,8 @@ static void *mndBuildVCreateStbReq(SMnode *pMnode, SVgObj *pVgroup, SStbObj *pSt if (req.rollup) { req.rsmaParam.maxdelay[0] = pStb->maxdelay[0]; req.rsmaParam.maxdelay[1] = pStb->maxdelay[1]; + req.rsmaParam.watermark[0] = pStb->watermark[0]; + req.rsmaParam.watermark[1] = pStb->watermark[1]; if (pStb->ast1Len > 0) { if (mndConvertRsmaTask(&req.rsmaParam.qmsg[0], &req.rsmaParam.qmsgLen[0], pStb->pAst1, pStb->uid, STREAM_TRIGGER_WINDOW_CLOSE, req.rsmaParam.watermark[0]) < 0) { diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c43772062e..989d24295e 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -90,14 +90,14 @@ struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit int64_t refId; // shared by fetch tasks - volatile int64_t qBufSize; // queue buffer size + volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int8_t nExecutor; // [1, max(half of query threads, 4)] int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing - int8_t execStat; // 0 not in exec , 1 in exec SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SHashObj *infoHash; // key: suid, value: SRSmaInfo - SHashObj *fetchHash; // key: suid, value: L1 or L2 or L1|L2 + tsem_t notEmpty; // has items in queue buffer }; struct SSmaStat { @@ -111,26 +111,29 @@ struct SSmaStat { #define SMA_STAT_TSMA(s) (&(s)->tsmaStat) #define SMA_STAT_RSMA(s) (&(s)->rsmaStat) #define RSMA_INFO_HASH(r) ((r)->infoHash) -#define RSMA_FETCH_HASH(r) ((r)->fetchHash) #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - int8_t level; - int8_t triggerStat; - uint16_t interval; // second - int32_t maxDelay; - tmr_h tmrId; + int8_t level; + int8_t triggerStat; + uint8_t nSkipped; // number of skipped to fetch data from all active window + int8_t fetchLevel; + int32_t maxDelay; // ms + int64_t lastFetch; // ms + tmr_h tmrId; }; struct SRSmaInfo { STSchema *pTSchema; int64_t suid; - int64_t refId; // refId of SRSmaStat - uint64_t delFlag : 1; - uint64_t lastReceived : 63; // second + int64_t refId; // refId of SRSmaStat + int64_t lastRecv; // ms + int8_t delFlag; + int8_t assigned; // 0 idle, 1 assgined for exec + int16_t padding; T_REF_DECLARE() SRSmaInfoItem items[TSDB_RETENTION_L2]; void *taskInfo[TSDB_RETENTION_L2]; // qTaskInfo_t diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7ac1cc4f0e..63d228ec8b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -198,7 +198,6 @@ int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); -int32_t smaProcessFetch(SSma* pSma, void* pMsg); int32_t smaProcessExec(SSma* pSma, void* pMsg); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 8b92475035..7b63056897 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -321,10 +321,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { - smaDebug("vgId:%d, rsma fetch tasks all finished", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks all finished", SMA_VID(pSma)); break; } else { - smaDebug("vgId:%d, rsma fetch tasks not all finished yet", SMA_VID(pSma)); + smaDebug("vgId:%d, rsma commit, fetch tasks not all finished yet", SMA_VID(pSma)); } ++nLoops; if (nLoops > 1000) { @@ -338,30 +338,25 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * 1) This is high cost task and should not put in asyncPreCommit originally. * 2) But, if put in asyncCommit, would trigger taskInfo cloning frequently. */ - nLoops = 0; - smaInfo("vgId:%d, start to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1); - if (old == 0) break; - if (++nLoops > 1000) { - sched_yield(); - nLoops = 0; - smaDebug("vgId:%d, wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - } - } - - smaInfo("vgId:%d, end to wait for rsma qtask free, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_COMMIT) < 0) { - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } + smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + nLoops = 0; + while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { + ++nLoops; + if (nLoops > 1000) { + sched_yield(); + nLoops = 0; + } + } + smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + +#if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); @@ -376,11 +371,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter); } - atomic_store_64(&pRSmaStat->qBufSize, 0); - atomic_store_8(&pRSmaStat->execStat, 0); // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); - + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); +#endif // step 5: others pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; @@ -426,7 +419,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge qTaskInfo and iQTaskInfo // lock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWLockLatch(SMA_ENV_LOCK(pEnv)); void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { @@ -480,10 +473,9 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t)); } taosArrayDestroy(rsmaDeleted); - // TODO: remove suid in files? // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); // step 2: cleanup outdated qtaskinfo files tdCleanupQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index f51aad22bd..d39efc748e 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -209,6 +209,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS SRSmaStat *pRSmaStat = (SRSmaStat *)(*pSmaStat); pRSmaStat->pSma = (SSma *)pSma; atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_INIT); + tsem_init(&pRSmaStat->notEmpty, 0, 0); // init smaMgmt smaInit(); @@ -230,12 +231,6 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (!RSMA_INFO_HASH(pRSmaStat)) { return TSDB_CODE_FAILED; } - - RSMA_FETCH_HASH(pRSmaStat) = taosHashInit( - RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); - if (!RSMA_FETCH_HASH(pRSmaStat)) { - return TSDB_CODE_FAILED; - } } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { @@ -267,6 +262,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { smaDebug("vgId:%d, destroy rsma stat %p", SMA_VID(pSma), pRSmaStat); // step 1: set rsma trigger stat cancelled atomic_store_8(RSMA_TRIGGER_STAT(pStat), TASK_TRIGGER_STAT_CANCELLED); + tsem_destroy(&(pStat->notEmpty)); // step 2: destroy the rsma info and associated fetch tasks if (taosHashGetSize(RSMA_INFO_HASH(pStat)) > 0) { @@ -279,10 +275,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } taosHashCleanup(RSMA_INFO_HASH(pStat)); - // step 3: destroy the rsma fetch hash - taosHashCleanup(RSMA_FETCH_HASH(pStat)); - - // step 4: wait all triggered fetch tasks finished + // step 3: wait all triggered fetch tasks finished int32_t nLoops = 0; while (1) { if (T_REF_VAL_GET((SSmaStat *)pStat) == 0) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 9b3b0cb63d..e8018c0f33 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -15,10 +15,14 @@ #include "sma.h" -#define RSMA_QTASKINFO_BUFSIZE (32768) -#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid -#define RSMA_QTASKEXEC_BUFSIZE (1048576) -#define RSMA_SUBMIT_BATCH_SIZE (1024) +#define RSMA_QTASKINFO_BUFSIZE (32768) // size +#define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid +#define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt +#define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt +#define RSMA_EXECUTOR_MAX (4) // cnt +#define RSMA_FETCH_DELAY_MAX (1800000) // ms +#define RSMA_FETCH_SKIP_MAX (1000) // cnt +#define RSMA_FETCH_ACTIVE_MAX (1800) // ms SSmaMgmt smaMgmt = { .inited = 0, @@ -40,11 +44,10 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSiz static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); static void tdFreeRSmaSubmitItems(SArray *pItems); -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr); +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr); static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, int8_t type, SRSmaQTaskInfoIter *pIter); @@ -668,7 +671,7 @@ static int32_t tdRSmaFetchAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSm } else { smaDebug("vgId:%d, rsma %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } -#if 0 +#if 1 char flag[10] = {0}; snprintf(flag, 10, "level %" PRIi8, pItem->level); blockDebugShowDataBlocks(pResList, flag); @@ -729,22 +732,24 @@ static int32_t tdExecuteRSmaImplAsync(SSma *pSma, const void *pMsg, int32_t inpu taosWriteQitem(pInfo->queue, qItem); + pInfo->lastRecv = taosGetTimestampMs(); + SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_add_fetch_64(&pRSmaStat->qBufSize, pReq->header.contLen); + + tsem_post(&(pRSmaStat->notEmpty)); + + int64_t nItems = atomic_fetch_add_64(&pRSmaStat->nBufItems, 1); // smoothing consume - int32_t n = bufSize / RSMA_QTASKEXEC_BUFSIZE; + int32_t n = nItems / RSMA_QTASKEXEC_SMOOTH_SIZE; if (n > 1) { if (n > 10) { n = 10; } - taosMsleep(n << 4); - if (n > 2) { + taosMsleep(n << 3); + if (n > 5) { smaWarn("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); - } else { - smaDebug("vgId:%d, pInfo->queue itemSize:%d, memSize:%" PRIi64 ", sleep %d ms", SMA_VID(pSma), - taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 4); + taosQueueItemSize(pInfo->queue), taosQueueMemorySize(pInfo->queue), n << 3); } } @@ -840,25 +845,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - taosRLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } if (!pRSmaInfo->taskInfo[0]) { if (tdCloneRSmaInfo(pSma, pRSmaInfo) < 0) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -910,22 +915,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp static int32_t tdRSmaExecCheck(SSma *pSma) { SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - int64_t bufSize = atomic_load_64(&pRSmaStat->qBufSize); - if (bufSize < RSMA_QTASKEXEC_BUFSIZE) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as less than %d", SMA_VID(pSma), bufSize, - RSMA_QTASKEXEC_BUFSIZE); + if (atomic_load_8(&pRSmaStat->nExecutor) >= TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { return TSDB_CODE_SUCCESS; } - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 1) { - smaDebug("vgId:%d, bufSize is %d but has no chance to exec as qTaskInfo occupied by another task", SMA_VID(pSma), - bufSize); - return TSDB_CODE_SUCCESS; - } - - smaDebug("vgId:%d, bufSize is %d and has chance to exec as qTaskInfo is free now", SMA_VID(pSma), bufSize); - SRSmaExecMsg fetchMsg; int32_t contLen = sizeof(SMsgHead); void *pBuf = rpcMallocCont(0 + contLen); @@ -949,7 +943,6 @@ static int32_t tdRSmaExecCheck(SSma *pSma) { return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); return TSDB_CODE_FAILED; } @@ -959,7 +952,7 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { // only applicable when rsma env exists return TSDB_CODE_SUCCESS; } - + STbUidStore uidStore = {0}; SRetention *pRetention = SMA_RETENTION(pSma); if (!RETENTION_VALID(pRetention + 1)) { // return directly if retention level 1 is invalid @@ -967,25 +960,35 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) { } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - STbUidStore uidStore = {0}; - tdFetchSubmitReqSuids(pMsg, &uidStore); + if (tdFetchSubmitReqSuids(pMsg, &uidStore) < 0) { + goto _err; + } if (uidStore.suid != 0) { - tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, uidStore.suid) < 0) { + goto _err; + } void *pIter = taosHashIterate(uidStore.uidHash, NULL); while (pIter) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid); + if (tdExecuteRSmaAsync(pSma, pMsg, inputType, *pTbSuid) < 0) { + goto _err; + } pIter = taosHashIterate(uidStore.uidHash, pIter); } - tdUidStoreDestory(&uidStore); - - tdRSmaExecCheck(pSma); + if (tdRSmaExecCheck(pSma) < 0) { + goto _err; + } } } + tdUidStoreDestory(&uidStore); return TSDB_CODE_SUCCESS; +_err: + tdUidStoreDestory(&uidStore); + smaError("vgId:%d, failed to process rsma submit since: %s", SMA_VID(pSma), terrstr()); + return TSDB_CODE_FAILED; } /** @@ -1521,8 +1524,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { - smaDebug("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", - smaMgmt.rsetId, pRSmaInfo->refId); + smaWarn("rsma fetch task not start since rsma stat already destroyed, rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, + pRSmaInfo->refId); return; } @@ -1553,7 +1556,14 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { smaDebug("vgId:%d, rsma fetch task started for level:%" PRIi8 " suid:%" PRIi64 " since stat is active", SMA_VID(pSma), pItem->level, pRSmaInfo->suid); // async process - tdRSmaFetchSend(pSma, pRSmaInfo, pItem->level); + pItem->fetchLevel = pItem->level; + SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); + SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); + ASSERT(qItem->level == pItem->level); + ASSERT(qItem->fetchLevel == pItem->fetchLevel); + tsem_post(&(pStat->notEmpty)); + smaInfo("vgId:%d, rsma fetch task planned for level:%" PRIi8 " suid:%" PRIi64, SMA_VID(pSma), pItem->level, + pRSmaInfo->suid); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is paused", @@ -1568,8 +1578,8 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; default: { - smaWarn("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); + smaDebug("vgId:%d, rsma fetch task not start for level:%" PRIi8 " suid:%" PRIi64 " since stat is unknown", + SMA_VID(pSma), pItem->level, pRSmaInfo->suid); } break; } @@ -1578,135 +1588,21 @@ _end: tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } -/** - * @brief put rsma fetch msg to fetch queue - * - * @param pSma - * @param pInfo - * @param level - * @return int32_t - */ -static int32_t tdRSmaFetchSend(SSma *pSma, SRSmaInfo *pInfo, int8_t level) { - SRSmaFetchMsg fetchMsg = {.suid = pInfo->suid, .level = level}; - int32_t ret = 0; - int32_t contLen = 0; - SEncoder encoder = {0}; - tEncodeSize(tEncodeSRSmaFetchMsg, &fetchMsg, contLen, ret); - if (ret < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - goto _err; - } - - void *pBuf = rpcMallocCont(contLen + sizeof(SMsgHead)); - tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), contLen); - if (tEncodeSRSmaFetchMsg(&encoder, &fetchMsg) < 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tEncoderClear(&encoder); - } - tEncoderClear(&encoder); - - ((SMsgHead *)pBuf)->vgId = SMA_VID(pSma); - ((SMsgHead *)pBuf)->contLen = contLen + sizeof(SMsgHead); - - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_VND_FETCH_RSMA, - .pCont = pBuf, - .contLen = contLen + sizeof(SMsgHead), - }; - - if ((terrno = tmsgPutToQueue(&pSma->pVnode->msgCb, QUERY_QUEUE, &rpcMsg)) != 0) { - smaError("vgId:%d, failed to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8 " since %s", - SMA_VID(pSma), pInfo->suid, level, terrstr()); - goto _err; - } - - smaDebug("vgId:%d, success to put rsma fetch msg into fetch-queue for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), - pInfo->suid, level); - - return TSDB_CODE_SUCCESS; -_err: - return TSDB_CODE_FAILED; -} - -/** - * @brief fetch rsma data of level 2/3 and submit - * - * @param pSma - * @param pMsg - * @return int32_t - */ -int32_t smaProcessFetch(SSma *pSma, void *pMsg) { - SRpcMsg *pRpcMsg = (SRpcMsg *)pMsg; - SRSmaFetchMsg req = {0}; - SDecoder decoder = {0}; - void *pBuf = NULL; - SRSmaStat *pRSmaStat = NULL; - if (!pRpcMsg || pRpcMsg->contLen < sizeof(SMsgHead)) { - terrno = TSDB_CODE_RSMA_FETCH_MSG_MSSED_UP; - goto _err; - } - - pBuf = POINTER_SHIFT(pRpcMsg->pCont, sizeof(SMsgHead)); - - tDecoderInit(&decoder, pBuf, pRpcMsg->contLen); - if (tDecodeSRSmaFetchMsg(&decoder, &req) < 0) { - terrno = TSDB_CODE_INVALID_MSG; - goto _err; - } - - pRSmaStat = SMA_RSMA_STAT(pSma); - - if (atomic_val_compare_exchange_8(&pRSmaStat->execStat, 0, 1) == 0) { - SArray *pSubmitArr = NULL; - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pRSmaStat->execStat, 0); - goto _err; - } - tdRSmaConsumeAndFetch(pSma, req.suid, req.level, pSubmitArr); - atomic_store_8(&pRSmaStat->execStat, 0); - taosArrayDestroy(pSubmitArr); - } else { - int8_t level = req.level; - int8_t *val = taosHashGet(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid)); - if (val) { - level |= (*val); - } - ASSERT(level >= 1 && level <= 3); - taosHashPut(RSMA_FETCH_HASH(pRSmaStat), &req.suid, sizeof(req.suid), &level, sizeof(level)); - } - - tDecoderClear(&decoder); - smaDebug("vgId:%d, success to process rsma fetch msg for suid:%" PRIi64 " level:%" PRIi8, SMA_VID(pSma), req.suid, - req.level); - return TSDB_CODE_SUCCESS; -_err: - tDecoderClear(&decoder); - smaError("vgId:%d, failed to process rsma fetch msg since %s", SMA_VID(pSma), terrstr()); - return TSDB_CODE_FAILED; -} - static void tdFreeRSmaSubmitItems(SArray *pItems) { for (int32_t i = 0; i < taosArrayGetSize(pItems); ++i) { taosFreeQitem(*(void **)taosArrayGet(pItems, i)); } } -static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SArray *pSubmitArr) { - SRSmaInfo *pInfo = tdAcquireRSmaInfoBySuid(pSma, suid); - if (!pInfo) { - return TSDB_CODE_SUCCESS; - } - +static int32_t tdRSmaConsumeAndFetch(SSma *pSma, SRSmaInfo *pInfo, SArray *pSubmitArr) { // step 1: consume submit req +#if 0 int64_t qMemSize = 0; if ((qMemSize = taosQueueMemorySize(pInfo->queue) > 0)) { taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pSma); - atomic_fetch_sub_64(&pRSmaStat->qBufSize, qMemSize); + atomic_fetch_sub_64(&pRSmaStat->nBufItems, taosQallItemSize(pInfo->qall)); taosArrayClear(pSubmitArr); @@ -1736,25 +1632,52 @@ static int32_t tdRSmaConsumeAndFetch(SSma *pSma, int64_t suid, int8_t level, SAr tdFreeRSmaSubmitItems(pSubmitArr); } } - - // step 2: fetch rsma result +#endif + // step 2: fetch rsma result(consider the efficiency and functionality) SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; for (int8_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (level & i) { + SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); + if (pItem->fetchLevel) { + pItem->fetchLevel = 0; qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pInfo, i - 1); if (!taskInfo) { continue; } + + int64_t curMs = taosGetTimestampMs(); + if ((pItem->nSkipped > RSMA_FETCH_SKIP_MAX) || (pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + pItem->nSkipped = 0; + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else { + if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { + ++pItem->nSkipped; + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + continue; + } else { + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } + } + + pItem->lastFetch = curMs; + if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; } - SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pInfo, i - 1); - if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, suid) < 0) { + if (tdRSmaFetchAndSubmitResult(pSma, taskInfo, pItem, pInfo->pTSchema, pInfo->suid) < 0) { tdCleanupStreamInputDataBlock(taskInfo); goto _err; } tdCleanupStreamInputDataBlock(taskInfo); + smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + } else { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 + " maxDelay:%d, fetch not executed as fetchLevel is %" PRIi8, + SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); } } @@ -1766,6 +1689,45 @@ _err: return TSDB_CODE_FAILED; } +static int32_t tdRSmaBatchExec(SSma *pSma, SRSmaInfo *pInfo, STaosQall *qall, SArray *pSubmitArr, ERsmaExecType type) { + taosArrayClear(pSubmitArr); + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + if (taosArrayPush(pSubmitArr, &msg) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } else { + break; + } + } + + int32_t size = taosArrayGetSize(pSubmitArr); + if (size > 0) { + for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { + if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { + tdFreeRSmaSubmitItems(pSubmitArr); + goto _err; + } + } + tdFreeRSmaSubmitItems(pSubmitArr); + } + return TSDB_CODE_SUCCESS; +_err: + while (1) { + void *msg = NULL; + taosGetQitem(qall, (void **)&msg); + if (msg) { + taosFreeQitem(msg); + } else { + break; + } + } + return TSDB_CODE_FAILED; +} + /** * @brief * @@ -1777,7 +1739,6 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); SHashObj *infoHash = NULL; - SArray *pSubmitQArr = NULL; SArray *pSubmitArr = NULL; bool isFetchAll = false; @@ -1786,106 +1747,95 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { goto _err; } - if (type == RSMA_EXEC_OVERFLOW) { - taosRLockLatch(SMA_ENV_LOCK(pEnv)); - if (atomic_load_64(&pRSmaStat->qBufSize) < RSMA_QTASKEXEC_BUFSIZE) { - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - return TSDB_CODE_SUCCESS; - } - taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - } - - if (!(pSubmitQArr = taosArrayInit(taosHashGetSize(infoHash), sizeof(SRSmaExecQItem)))) { + if (!(pSubmitArr = + taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - if (!(pSubmitArr = taosArrayInit(RSMA_SUBMIT_BATCH_SIZE, POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - // step 1: rsma exec - consume data in buffer queue for all suids - SRSmaExecQItem qItem = {0}; - void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock - if (type == RSMA_EXEC_OVERFLOW) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->queue)) { - taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock - qItem.qall = &pInfo->qall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->queue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else if (type == RSMA_EXEC_COMMIT) { - while (pIter) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; - if (taosQueueItemSize(pInfo->iQueue)) { - taosReadAllQitems(pInfo->iQueue, pInfo->iQall); - qItem.qall = &pInfo->iQall; - qItem.pRSmaInfo = pIter; - taosArrayPush(pSubmitQArr, &qItem); - } - ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); - pIter = taosHashIterate(infoHash, pIter); - } - } else { - ASSERT(0); - } - atomic_store_64(&pRSmaStat->qBufSize, 0); - - int32_t qSize = taosArrayGetSize(pSubmitQArr); - for (int32_t i = 0; i < qSize; ++i) { - SRSmaExecQItem *pItem = taosArrayGet(pSubmitQArr, i); - while (1) { - void *msg = NULL; - taosGetQitem(*(STaosQall **)pItem->qall, (void **)&msg); - if (msg) { - if (taosArrayPush(pSubmitArr, &msg) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; - } + int32_t nIdle = 0; + while (true) { + if (++nIdle > 100) { + if (atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1) > 1) { + // free the exec thread if without SubmitReq + break; } else { + // keep at least 1 exec thread only if without SubmitReq in case of no query thread to use when busy again + atomic_add_fetch_8(&pRSmaStat->nExecutor, 1); + nIdle = 0; + } + } + // step 1: rsma exec - consume data in buffer queue for all suids + if (type == RSMA_EXEC_OVERFLOW || type == RSMA_EXEC_COMMIT) { + void *pIter = taosHashIterate(infoHash, NULL); // infoHash has r/w lock + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + int64_t itemSize = 0; + if ((itemSize = taosQueueItemSize(pInfo->queue)) || RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || + RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + smaDebug("vgId:%d queueItemSize is %" PRIi64 " execType:%" PRIi8, SMA_VID(pSma), itemSize, type); + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->qall); + if (qallItemSize > 0) { + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } + + tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + if (qallItemSize > 0) { + // subtract the item size after the task finished, commit should wait for all items be consumed + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + nIdle = 0; + } + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } + } + pIter = taosHashIterate(infoHash, pIter); + } + if (type == RSMA_EXEC_COMMIT) { break; } } +#if 0 + else if (type == RSMA_EXEC_COMMIT) { + while (pIter) { + SRSmaInfo *pInfo = *(SRSmaInfo **)pIter; + if (taosQueueItemSize(pInfo->iQueue)) { + if (atomic_val_compare_exchange_8(&pInfo->assigned, 0, 1) == 0) { + taosReadAllQitems(pInfo->iQueue, pInfo->iQall); // queue has mutex lock + int32_t qallItemSize = taosQallItemSize(pInfo->iQall); + if (qallItemSize > 0) { + atomic_fetch_sub_64(&pRSmaStat->nBufItems, qallItemSize); + nIdle = 0; - int32_t size = taosArrayGetSize(pSubmitArr); - if (size > 0) { - SRSmaInfo *pInfo = *(SRSmaInfo **)pItem->pRSmaInfo; - for (int32_t i = 1; i <= TSDB_RETENTION_L2; ++i) { - if (tdExecuteRSmaImpl(pSma, pSubmitArr->pData, size, STREAM_INPUT__MERGED_SUBMIT, pInfo, type, i) < 0) { - tdFreeRSmaSubmitItems(pSubmitArr); - goto _err; + // batch exec + tdRSmaBatchExec(pSma, pInfo, pInfo->qall, pSubmitArr, type); + } + + // tdRSmaConsumeAndFetch(pSma, pInfo, pSubmitArr); + ASSERT(1 == atomic_val_compare_exchange_8(&pInfo->assigned, 1, 0)); + } } + ASSERT(taosQueueItemSize(pInfo->iQueue) == 0); + pIter = taosHashIterate(infoHash, pIter); } - tdFreeRSmaSubmitItems(pSubmitArr); - taosArrayClear(pSubmitArr); + break; + } +#endif + else { + ASSERT(0); } - } - // step 2: rsma fetch - consume data in buffer queue for suids triggered by timer - if (taosHashGetSize(RSMA_FETCH_HASH(pRSmaStat)) <= 0) { - goto _end; - } - pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), NULL); - if (pIter) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); - while ((pIter = taosHashIterate(RSMA_FETCH_HASH(pRSmaStat), pIter))) { - tdRSmaConsumeAndFetch(pSma, *(int64_t *)taosHashGetKey(pIter, NULL), *(int8_t *)pIter, pSubmitArr); + if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { + tsem_wait(&pRSmaStat->notEmpty); } - } + } // end of while(true) _end: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_SUCCESS; _err: taosArrayDestroy(pSubmitArr); - taosArrayDestroy(pSubmitQArr); return TSDB_CODE_FAILED; } @@ -1905,15 +1855,21 @@ int32_t smaProcessExec(SSma *pSma, void *pMsg) { goto _err; } smaDebug("vgId:%d, begin to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { - goto _err; + + int8_t nOld = atomic_fetch_add_8(&pRSmaStat->nExecutor, 1); + + if (nOld < TMIN(RSMA_EXECUTOR_MAX, tsNumOfVnodeQueryThreads / 2)) { + if (tdRSmaProcessExecImpl(pSma, RSMA_EXEC_OVERFLOW) < 0) { + goto _err; + } + } else { + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); } - atomic_store_8(&pRSmaStat->execStat, 0); smaDebug("vgId:%d, success to process rsma exec msg by TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); return TSDB_CODE_SUCCESS; _err: - atomic_store_8(&pRSmaStat->execStat, 0); + atomic_fetch_sub_8(&pRSmaStat->nExecutor, 1); smaError("vgId:%d, failed to process rsma exec msg by TID:%p since %s", SMA_VID(pSma), (void *)taosGetSelfPthreadId(), terrstr()); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index c73d2ccfd5..7a8d168f4f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -301,8 +301,6 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg, 0); case TDMT_SCH_QUERY_CONTINUE: return qWorkerProcessCQueryMsg(&handle, pVnode->pQuery, pMsg, 0); - case TDMT_VND_FETCH_RSMA: - return smaProcessFetch(pVnode->pSma, pMsg); case TDMT_VND_EXEC_RSMA: return smaProcessExec(pVnode->pSma, pMsg); default: diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d15dc99122..a432196b01 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3131,6 +3131,7 @@ int32_t aggDecodeResultRow(SOperatorInfo* pOperator, char* result) { initResultRow(resultRow); pInfo->resultRowInfo.cur = (SResultRowPosition){.pageId = resultRow->pageId, .offset = resultRow->offset}; + // releaseBufPage(pSup->pResultBuf, getBufPage(pSup->pResultBuf, pageId)); } if (offset != length) { diff --git a/source/util/src/tqueue.c b/source/util/src/tqueue.c index 50beba8a9b..eb70002680 100644 --- a/source/util/src/tqueue.c +++ b/source/util/src/tqueue.c @@ -298,7 +298,8 @@ int32_t taosGetQitem(STaosQall *qall, void **ppItem) { return num; } -void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +void taosResetQitems(STaosQall *qall) { qall->current = qall->start; } +int32_t taosQallItemSize(STaosQall *qall) { return qall->numOfItems; } STaosQset *taosOpenQset() { STaosQset *qset = taosMemoryCalloc(sizeof(STaosQset), 1);