diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 165f5da4b6..9931462e5f 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -113,8 +113,9 @@ struct SRSmaStat { volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) volatile int32_t nFetchAll; // active number of fetch all - int8_t triggerStat; // shared by fetch tasks - int8_t commitStat; // 0 not in committing, 1 in committing + volatile int8_t triggerStat; // shared by fetch tasks + volatile int8_t commitStat; // 0 not in committing, 1 in committing + volatile int8_t delFlag; // 0 no deleted SRSmaInfo, 1 has deleted SRSmaInfo SRSmaFS fs; // for recovery/snapshot r/w SHashObj *infoHash; // key: suid, value: SRSmaInfo tsem_t notEmpty; // has items in queue buffer @@ -142,7 +143,7 @@ struct SRSmaInfoItem { int8_t level : 4; int8_t fetchLevel : 4; int8_t triggerStat; - uint16_t nSkipped; + uint16_t nScanned; int32_t maxDelay; // ms tmr_h tmrId; }; @@ -196,16 +197,21 @@ typedef enum { int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); void tdDestroySmaEnv(SSmaEnv *pSmaEnv); void *tdFreeSmaEnv(SSmaEnv *pSmaEnv); -int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat); -int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdLockSma(SSma *pSma); int32_t tdUnLockSma(SSma *pSma); void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); +static FORCE_INLINE void tdRefSmaStat(SSma *pSma, SSmaStat *pStat) { + int32_t ref = T_REF_INC(pStat); + smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); +} +static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { + int32_t ref = T_REF_DEC(pStat); + smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); +} + // rsma -int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); -int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaFSOpen(SSma *pSma, int64_t version); void tdRSmaFSClose(SRSmaFS *fs); @@ -218,9 +224,17 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer); +void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); -void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); -void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); +static FORCE_INLINE void tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { + int32_t ref = T_REF_INC(pRSmaInfo); + smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); +} +static FORCE_INLINE void tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { + int32_t ref = T_REF_DEC(pRSmaInfo); + smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); +} // smaFileUtil ================ diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index f08cefdb04..07ec7d0694 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -210,15 +210,18 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { } if (fsMaxVer < committed) { - SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; - if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); + if (taosCheckExistFile(qTaskInfoFullName)) { + SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; + if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) { + taosWUnLockLatch(RSMA_FS_LOCK(pStat)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } } } else { smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode), - committed, fsMaxVer); + committed, fsMaxVer); } taosWUnLockLatch(RSMA_FS_LOCK(pStat)); @@ -314,12 +317,12 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { if (tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)) < 0) { return TSDB_CODE_FAILED; } - smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock - // taosWLockLatch(SMA_ENV_LOCK(pEnv)); + taosWLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(RSMA_INFO_HASH(pRSmaStat)); @@ -335,7 +338,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } // unlock - // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); #endif return TSDB_CODE_SUCCESS; @@ -380,25 +383,26 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge qTaskInfo and iQTaskInfo // lock - // taosWLockLatch(SMA_ENV_LOCK(pEnv)); + if (1 == atomic_val_compare_exchange_8(&pRSmaStat->delFlag, 1, 0)) { + taosWLockLatch(SMA_ENV_LOCK(pEnv)); - void *pIter = NULL; - while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - int32_t refVal = T_REF_VAL_GET(pRSmaInfo); - if (refVal == 0) { - taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); - } else { - smaDebug( - "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " - "table:%" PRIi64, - SMA_VID(pSma), refVal, *pSuid); + void *pIter = NULL; + while ((pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), pIter))) { + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + taosHashRemove(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(*pSuid)); + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); + } + + continue; } - - continue; - } #if 0 if (pRSmaInfo->taskInfo[0]) { if (pRSmaInfo->iTaskInfo[0]) { @@ -413,10 +417,11 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), *pSuid); #endif - } + } - // unlock - // taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + // unlock + taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); + } tdUpdateQTaskInfoFiles(pSma, pRSmaStat); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 2595a629ba..b870ea1b62 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -177,39 +177,6 @@ void *tdFreeSmaEnv(SSmaEnv *pSmaEnv) { return NULL; } -int32_t tdRefSmaStat(SSma *pSma, SSmaStat *pStat) { - if (!pStat) return 0; - - int ref = T_REF_INC(pStat); - smaDebug("vgId:%d, ref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); - return 0; -} - -int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { - if (!pStat) return 0; - - int ref = T_REF_DEC(pStat); - smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); - return 0; -} - -int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { - if (!pRSmaInfo) return 0; - - int ref = T_REF_INC(pRSmaInfo); - smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - return 0; -} - -int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { - if (!pRSmaInfo) return 0; - - int ref = T_REF_DEC(pRSmaInfo); - smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - - return 0; -} - static void tRSmaInfoHashFreeNode(void *data) { SRSmaInfo *pRSmaInfo = NULL; SRSmaInfoItem *pItem = NULL; @@ -492,6 +459,8 @@ static int32_t tdRsmaStopExecutor(const SSma *pSma) { taosThreadJoin(pthread[i], NULL); } } + + smaInfo("vgId:%d, rsma executor stopped, number:%d", SMA_VID(pSma), tsNumOfVnodeRsmaThreads); } return 0; } \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 920d04bf6c..ec8fcb2932 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -19,7 +19,7 @@ #define RSMA_QTASKINFO_HEAD_LEN (sizeof(int32_t) + sizeof(int8_t) + sizeof(int64_t)) // len + type + suid #define RSMA_QTASKEXEC_SMOOTH_SIZE (100) // cnt #define RSMA_SUBMIT_BATCH_SIZE (1024) // cnt -#define RSMA_FETCH_DELAY_MAX (900000) // ms +#define RSMA_FETCH_DELAY_MAX (120000) // ms #define RSMA_FETCH_ACTIVE_MAX (1000) // ms #define RSMA_FETCH_INTERVAL (5000) // ms @@ -302,7 +302,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); - pItem->triggerStat = TASK_TRIGGER_STAT_INACTIVE; + pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -320,7 +320,9 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + pItem->fetchLevel = pItem->level; + taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); + smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 ", finally maxdelay:%" PRIi32, @@ -470,6 +472,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { } // set del flag for data in mem + atomic_store_8(&pRSmaStat->delFlag, 1); RSMA_INFO_SET_DEL(pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); @@ -939,25 +942,25 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } - // taosRLockLatch(SMA_ENV_LOCK(pEnv)); + taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } if (!pRSmaInfo->taskInfo[0]) { if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) { - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); ASSERT(pRSmaInfo->suid == suid); return pRSmaInfo; } - // taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } @@ -1709,21 +1712,23 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { continue; } - int64_t curMs = taosGetTimestampMs(); - if ((pItem->nSkipped * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch executed", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); - } else if (((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX)) { - ++pItem->nSkipped; - smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); - continue; + if ((++pItem->nScanned * pItem->maxDelay) > RSMA_FETCH_DELAY_MAX) { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch executed", + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); } else { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", - SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + int64_t curMs = taosGetTimestampMs(); + if ((curMs - pInfo->lastRecv) < RSMA_FETCH_ACTIVE_MAX) { + smaTrace("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch skipped ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); // restore the active stat + continue; + } else { + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " curMs:%" PRIi64 " lastRecv:%" PRIi64 ", fetch executed ", + SMA_VID(pSma), pInfo->suid, i, curMs, pInfo->lastRecv); + } } - pItem->nSkipped = 0; + pItem->nScanned = 0; if ((terrno = qSetMultiStreamInput(taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK)) < 0) { goto _err; @@ -1734,12 +1739,12 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo) { } tdCleanupStreamInputDataBlock(taskInfo); - smaInfo("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 " maxDelay:%d, fetch finished", - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay); + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch finished", + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay); } else { - smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nSkipped:%" PRIi8 + smaDebug("vgId:%d, suid:%" PRIi64 " level:%" PRIi8 " nScanned:%" PRIi8 " maxDelay:%d, fetch not executed as fetch level is %" PRIi8, - SMA_VID(pSma), pInfo->suid, i, pItem->nSkipped, pItem->maxDelay, pItem->fetchLevel); + SMA_VID(pSma), pInfo->suid, i, pItem->nScanned, pItem->maxDelay, pItem->fetchLevel); } } @@ -1829,6 +1834,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { bool occupied = (batchMax <= 1); if (batchMax > 1) { batchMax = 100 / batchMax; + batchMax = TMAX(batchMax, 4); } while (occupied || (++batchCnt < batchMax)) { // greedy mode taosReadAllQitems(pInfo->queue, pInfo->qall); // queue has mutex lock @@ -1838,13 +1844,15 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); - if (oldStat == 0 || - ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { - atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); - tdRSmaFetchAllResult(pSma, pInfo); - if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + if (RSMA_INFO_ITEM(pInfo, 0)->fetchLevel || RSMA_INFO_ITEM(pInfo, 1)->fetchLevel) { + int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); + if (oldStat == 0 || + ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { + atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); + tdRSmaFetchAllResult(pSma, pInfo); + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + } } } @@ -1917,7 +1925,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { tsem_wait(&pRSmaStat->notEmpty); if ((pEnv->flag & SMA_ENV_FLG_CLOSE) && (atomic_load_64(&pRSmaStat->nBufItems) <= 0)) { - smaInfo("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, + smaDebug("vgId:%d, exec task end, flag:%" PRIi8 ", nBufItems:%" PRIi64, SMA_VID(pSma), pEnv->flag, atomic_load_64(&pRSmaStat->nBufItems)); break; } diff --git a/source/dnode/vnode/src/sma/smaTimeRange.c b/source/dnode/vnode/src/sma/smaTimeRange.c index 173391b769..e2cb51f586 100644 --- a/source/dnode/vnode/src/sma/smaTimeRange.c +++ b/source/dnode/vnode/src/sma/smaTimeRange.c @@ -178,7 +178,6 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char return TSDB_CODE_FAILED; } - tdRefSmaStat(pSma, pStat); pTsmaStat = SMA_STAT_TSMA(pStat); if (!pTsmaStat->pTSma) { @@ -230,9 +229,7 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char goto _err; } - tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_SUCCESS; _err: - tdUnRefSmaStat(pSma, pStat); return TSDB_CODE_FAILED; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 41c06bc500..60d967681b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2815,6 +2815,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret if (VND_IS_RSMA(pVnode)) { int8_t level = 0; int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); + int64_t offset = TSDB_TICK_PER_SECOND(pVnode->config.tsdbCfg.precision); for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; @@ -2824,7 +2825,7 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret } break; } - if ((now - pRetention->keep) <= winSKey) { + if ((now - pRetention->keep) <= (winSKey + offset)) { break; } ++level;