From c2c7ccf776118d55255d3463a8ccb5016179d770 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 7 Aug 2022 20:06:31 +0800 Subject: [PATCH] enh: rsma info lifecyle mgmt --- source/dnode/vnode/src/inc/sma.h | 9 +- source/dnode/vnode/src/sma/smaCommit.c | 54 +++++-- source/dnode/vnode/src/sma/smaEnv.c | 5 +- source/dnode/vnode/src/sma/smaRollup.c | 207 +++++++------------------ source/dnode/vnode/src/sma/smaUtil.c | 30 ++-- 5 files changed, 117 insertions(+), 188 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 51711e6e97..944d7759b2 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -115,10 +115,10 @@ struct SSmaStat { #define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { - tmr_h tmrId; - int32_t maxDelay; int8_t level; int8_t triggerStat; + int32_t maxDelay; + tmr_h tmrId; }; struct SRSmaInfo { @@ -173,8 +173,8 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat); int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo); -void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); -int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln); +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId); +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId); int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); @@ -233,7 +233,6 @@ void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t le static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeSmaState(SSmaStat *pSmaStat, int8_t smaType); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid); int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 8074c76806..373cfdfb47 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -402,7 +402,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { // step 1: merge rsmaInfoHash and iRsmaInfoHash // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); - +#if 0 if (taosHashGetSize(RSMA_INFO_HASH(pRSmaStat)) <= 0) { // just switch the hash pointer if rsmaInfoHash is empty if (taosHashGetSize(RSMA_IMU_INFO_HASH(pRSmaStat)) > 0) { @@ -411,25 +411,47 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { RSMA_IMU_INFO_HASH(pRSmaStat) = infoHash; } } else { - void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); - while (pIter) { - tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); +#endif +#if 1 + void *pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), NULL); + while (pIter) { + tb_uid_t *pSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); - if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { - taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); - smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), - *pSuid); - } else { - // free the resources - SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; - tdFreeRSmaInfo(pSma, pRSmaInfo, false); - smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), - *pSuid); + if (!taosHashGet(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t))) { + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + int32_t refVal = T_REF_VAL_GET(pRSmaInfo); + if (refVal == 0) { + tdFreeRSmaInfo(pSma, pRSmaInfo, true); + smaDebug( + "vgId:%d, rsma async post commit, free rsma info since already deleted and ref is 0 for " + "table:%" PRIi64, + SMA_VID(pSma), *pSuid); + } else { + smaDebug( + "vgId:%d, rsma async post commit, not free rsma info since ref is %d although already deleted for " + "table:%" PRIi64, + SMA_VID(pSma), refVal, *pSuid); + } + + pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + continue; } - - pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); + taosHashPut(RSMA_INFO_HASH(pRSmaStat), pSuid, sizeof(tb_uid_t), pIter, sizeof(pIter)); + smaDebug("vgId:%d, rsma async post commit, migrated from iRsmaInfoHash for table:%" PRIi64, SMA_VID(pSma), + *pSuid); + } else { + // free the resources + SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)pIter; + tdFreeRSmaInfo(pSma, pRSmaInfo, false); + smaDebug("vgId:%d, rsma async post commit, free rsma info since already COW for table:%" PRIi64, SMA_VID(pSma), + *pSuid); } + + pIter = taosHashIterate(RSMA_IMU_INFO_HASH(pRSmaStat), pIter); } +#endif + // } taosHashCleanup(RSMA_IMU_INFO_HASH(pRSmaStat)); RSMA_IMU_INFO_HASH(pRSmaStat) = NULL; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 5345d68c3b..ccb6ad3a72 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -171,7 +171,7 @@ int32_t tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { int32_t tdRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { if (!pRSmaInfo) return 0; - + int ref = T_REF_INC(pRSmaInfo); smaDebug("vgId:%d, ref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); return 0; @@ -183,9 +183,6 @@ int32_t tdUnRefRSmaInfo(SSma *pSma, SRSmaInfo *pRSmaInfo) { int ref = T_REF_DEC(pRSmaInfo); smaDebug("vgId:%d, unref rsma info:%p, val:%d", SMA_VID(pSma), pRSmaInfo, ref); - if (ref == 0) { - tdRemoveRSmaInfoBySuid(pSma, pRSmaInfo->suid); - } return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index ba14b657bf..fd2222c5e4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -32,11 +32,14 @@ static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *ui static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); -static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, int8_t level); -static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); -static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, - int8_t blkType); -static void tdRSmaFetchTrigger(void *param, void *tmrId); +static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType, SRSmaInfo *pInfo, tb_uid_t suid, + int8_t level); +static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid); +static void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo); + +static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, + SRSmaStat *pStat, int8_t blkType); +static void tdRSmaFetchTrigger(void *param, void *tmrId); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -114,7 +117,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (pInfo) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = &pInfo->items[i]; - + if (isDeepFree && pItem->tmrId) { smaDebug("vgId:%d, stop fetch timer %p for table %" PRIi64 " level %d", SMA_VID(pSma), pInfo->suid, pItem->tmrId, i + 1); @@ -168,7 +171,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) return TSDB_CODE_SUCCESS; } - pRSmaInfo = tdGetRSmaInfoBySuid(pSma, *suid); + pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, *suid); if (!pRSmaInfo) { smaError("vgId:%d, failed to get rsma info for uid:%" PRIi64, SMA_VID(pSma), *suid); @@ -179,6 +182,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { if ((qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true) < 0)) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); return TSDB_CODE_FAILED; @@ -189,6 +193,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) } } + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -417,7 +422,7 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { SSmaStat *pStat = SMA_ENV_STAT(pSmaEnv); SRSmaStat *pRSmaStat = SMA_RSMA_STAT(pStat); - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pReq->suid); + SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pReq->suid); if (!pRSmaInfo) { smaWarn("vgId:%d, drop rsma for stable %s %" PRIi64 " failed no rsma in hash", TD_VID(pVnode), pReq->name, @@ -429,8 +434,10 @@ int32_t tdProcessRSmaDrop(SSma *pSma, SVDropStbReq *pReq) { RSMA_INFO_SET_DEL(pRSmaInfo); tdUnRefRSmaInfo(pSma, pRSmaInfo); - // save to file + tdReleaseRSmaInfo(pSma, pRSmaInfo); + // save to file + // TODO smaDebug("vgId:%d, drop rsma for table %" PRIi64 " succeed", TD_VID(pVnode), pReq->suid); return TSDB_CODE_SUCCESS; } @@ -575,10 +582,10 @@ static void tdDestroySDataBlockArray(SArray *pArray) { /** * @brief retention of rsma1/rsma2 - * - * @param pSma - * @param now - * @return int32_t + * + * @param pSma + * @param now + * @return int32_t */ int32_t smaDoRetention(SSma *pSma, int64_t now) { int32_t code = TSDB_CODE_SUCCESS; @@ -597,8 +604,8 @@ _end: return code; } -static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, - int8_t blkType) { +static int32_t tdRSmaFetchAndSubmitResult(qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, + SRSmaStat *pStat, int8_t blkType) { SArray *pResult = NULL; SSma *pSma = pStat->pSma; @@ -711,7 +718,7 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t inputType * @param suid * @return SRSmaInfo* */ -static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { +static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; @@ -725,14 +732,23 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } + taosRLockLatch(SMA_ENV_LOCK(pEnv)); pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (pRSmaInfo && (pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); + return NULL; + } + tdRefRSmaInfo(pSma, pRSmaInfo); + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return pRSmaInfo; } if (RSMA_COMMIT_STAT(pStat) == 0) { // return NULL if not in committing stat + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } + taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); // clone the SRSmaInfo from iRsmaInfoHash to rsmaInfoHash if in committing stat SRSmaInfo *pCowRSmaInfo = NULL; @@ -742,7 +758,7 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { void *iRSmaInfo = taosHashGet(RSMA_IMU_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); if (iRSmaInfo) { SRSmaInfo *pIRSmaInfo = *(SRSmaInfo **)iRSmaInfo; - if (pIRSmaInfo) { + if (pIRSmaInfo && !RSMA_INFO_IS_DEL(pIRSmaInfo)) { if (tdCloneRSmaInfo(pSma, &pCowRSmaInfo, pIRSmaInfo) < 0) { // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -762,68 +778,40 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid) { pCowRSmaInfo = *(SRSmaInfo **)pCowRSmaInfo; ASSERT(!pCowRSmaInfo); } + + if(pCowRSmaInfo) { + tdRefRSmaInfo(pSma, pCowRSmaInfo); + } // unlock taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); return pCowRSmaInfo; } -/** - * @brief During the drop procedure, only need to delete the object in rsmaInfoHash. - * - * @param pSma - * @param suid - * @return SRSmaInfo* - */ -void tdRemoveRSmaInfoBySuid(SSma *pSma, int64_t suid) { - SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); - SRSmaStat *pStat = NULL; - SRSmaInfo *pRSmaInfo = NULL; - - if (!pEnv) { - return; +static FORCE_INLINE void tdReleaseRSmaInfo(SSma *pSma, SRSmaInfo *pInfo) { + if (pInfo) { + tdUnRefRSmaInfo(pSma, pInfo); } - - pStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); - if (!pStat || !RSMA_INFO_HASH(pStat)) { - return; - } - - // unlock - taosWLockLatch(SMA_ENV_LOCK(pEnv)); - - pRSmaInfo = taosHashGet(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - if (pRSmaInfo) { - if ((pRSmaInfo = *(SRSmaInfo **)pRSmaInfo)) { - tdFreeRSmaInfo(pSma, pRSmaInfo, true); - } - taosHashRemove(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t)); - smaDebug("vgId:%d, remove from infoHash for table:%" PRIu64 " succeed", SMA_VID(pSma), suid); - } - // unlock - taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); } static int32_t tdExecuteRSma(SSma *pSma, const void *pMsg, int32_t inputType, tb_uid_t suid) { - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoBySuid(pSma, suid); + SRSmaInfo *pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, suid); if (!pRSmaInfo) { smaDebug("vgId:%d, execute rsma, no rsma info for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } - if (!RSMA_INFO_QTASK(pRSmaInfo,0)) { + if (!RSMA_INFO_QTASK(pRSmaInfo, 0)) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaDebug("vgId:%d, execute rsma, no rsma qTaskInfo for suid:%" PRIu64, SMA_VID(pSma), suid); return TSDB_CODE_SUCCESS; } if (inputType == STREAM_INPUT__DATA_SUBMIT) { - tdRefRSmaInfo(pSma, pRSmaInfo); - tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L1); tdExecuteRSmaImpl(pSma, pMsg, inputType, pRSmaInfo, suid, TSDB_RETENTION_L2); - - tdUnRefRSmaInfo(pSma, pRSmaInfo); } + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -1034,7 +1022,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * SRSmaInfo *pRSmaInfo = NULL; void *qTaskInfo = NULL; - pRSmaInfo = tdGetRSmaInfoBySuid(pSma, pItem->suid); + pRSmaInfo = tdAcquireRSmaInfoBySuid(pSma, pItem->suid); if (!pRSmaInfo) { smaDebug("vgId:%d, no restore as no rsma info for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; @@ -1049,11 +1037,13 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * } if (!qTaskInfo) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaDebug("vgId:%d, no restore as NULL rsma qTaskInfo for table:%" PRIu64, SMA_VID(pSma), pItem->suid); return TSDB_CODE_SUCCESS; } if (qDeserializeTaskStatus(qTaskInfo, pItem->qTaskInfo, pItem->len) < 0) { + tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, restore rsma task failed for table:%" PRIi64 " level %d since %s", SMA_VID(pSma), pItem->suid, pItem->type, terrstr()); return TSDB_CODE_FAILED; @@ -1061,6 +1051,7 @@ static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem * smaDebug("vgId:%d, restore rsma task success for table:%" PRIi64 " level %d", SMA_VID(pSma), pItem->suid, pItem->type); + tdReleaseRSmaInfo(pSma, pRSmaInfo); return TSDB_CODE_SUCCESS; } @@ -1239,6 +1230,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { while (infoHash) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; + + if (RSMA_INFO_IS_DEL(pRSmaInfo)) { + infoHash = taosHashIterate(pInfoHash, infoHash); + continue; + } + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { qTaskInfo_t taskInfo = RSMA_INFO_QTASK(pRSmaInfo, i); if (!taskInfo) { @@ -1325,92 +1322,6 @@ _err: return TSDB_CODE_FAILED; } -#if 0 -/** - * @brief trigger to get rsma result - * - * @param param - * @param tmrId - */ -static void tdRSmaFetchTrigger(void *param, void *tmrId) { - SRSmaInfoItem *pItem = param; - SSma *pSma = NULL; - SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); - - if (!pStat) { - smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, - pItem->refId); - return; - } - - pSma = pStat->pSma; - - // if rsma trigger stat in paused, cancelled or finished, not start fetch task - int8_t rsmaTriggerStat = atomic_load_8(RSMA_TRIGGER_STAT(pStat)); - switch (rsmaTriggerStat) { - case TASK_TRIGGER_STAT_PAUSED: - case TASK_TRIGGER_STAT_CANCELLED: { - tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 - " refId:%d", - SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pItem->refId); - if (rsmaTriggerStat == TASK_TRIGGER_STAT_PAUSED) { - taosTmrReset(tdRSmaFetchTrigger, pItem->maxDelay > 5000 ? 5000 : pItem->maxDelay, pItem, smaMgmt.tmrHandle, - &pItem->tmrId); - } - return; - } - default: - break; - } - - SRSmaInfo *pRSmaInfo = tdGetRSmaInfoByItem(pItem); - if (RSMA_INFO_IS_DEL(pRSmaInfo)) { - goto _end; - } - - int8_t fetchTriggerStat = - atomic_val_compare_exchange_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE, TASK_TRIGGER_STAT_INACTIVE); - switch (fetchTriggerStat) { - case TASK_TRIGGER_STAT_ACTIVE: { - smaDebug("vgId:%d, fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is active", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); - - // sync procedure => async process - tdRefRSmaInfo(pSma, pRSmaInfo); - - SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; - qSetMultiStreamInput(pItem->taskInfo, &dataBlock, 1, STREAM_INPUT__DATA_BLOCK); - tdRSmaFetchAndSubmitResult(pItem, pRSmaInfo->pTSchema, pRSmaInfo->suid, pStat, STREAM_INPUT__DATA_BLOCK); - tdCleanupStreamInputDataBlock(pItem->taskInfo); - - tdUnRefRSmaInfo(pSma, pRSmaInfo); - // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - // taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); - } break; - case TASK_TRIGGER_STAT_PAUSED: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); - } break; - case TASK_TRIGGER_STAT_INACTIVE: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is inactive", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); - } break; - case TASK_TRIGGER_STAT_INIT: { - smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is init", SMA_VID(pSma), - pItem->level, pRSmaInfo->suid); - } break; - default: { - smaWarn("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is unknown", - SMA_VID(pSma), pItem->level, pRSmaInfo->suid); - } break; - } - -_end: - tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); -} -#endif - /** * @brief trigger to get rsma result * @@ -1426,7 +1337,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { return; } - SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__); + SRSmaStat *pStat = (SRSmaStat *)tdAcquireSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); if (!pStat) { smaDebug("rsma fetch task not start since already destroyed, rsetId rsetId:%" PRIi64 " refId:%d)", smaMgmt.rsetId, @@ -1441,7 +1352,7 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { switch (rsmaTriggerStat) { case TASK_TRIGGER_STAT_PAUSED: case TASK_TRIGGER_STAT_CANCELLED: { - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data since stat is %" PRIi8 ", rsetId rsetId:%" PRIi64 " refId:%d", SMA_VID(pSma), pItem->level, rsmaTriggerStat, smaMgmt.rsetId, pRSmaInfo->refId); @@ -1463,7 +1374,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { pItem->level, pRSmaInfo->suid); // sync procedure => async process - tdRefRSmaInfo(pSma, pRSmaInfo); SSDataBlock dataBlock = {.info.type = STREAM_GET_ALL}; qTaskInfo_t taskInfo = pRSmaInfo->taskInfo[pItem->level - 1]; @@ -1472,9 +1382,6 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { STREAM_INPUT__DATA_BLOCK); tdCleanupStreamInputDataBlock(taskInfo); - tdUnRefRSmaInfo(pSma, pRSmaInfo); - // atomic_store_8(&pItem->triggerStat, TASK_TRIGGER_STAT_ACTIVE); - // taosTmrReset(tdRSmaFetchTrigger, 5000, pItem, smaMgmt.tmrHandle, &pItem->tmrId); } break; case TASK_TRIGGER_STAT_PAUSED: { smaDebug("vgId:%d, not fetch rsma level %" PRIi8 " data for table:%" PRIi64 " since stat is paused", @@ -1495,5 +1402,5 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { } _end: - tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId, __func__, __LINE__); + tdReleaseSmaRef(smaMgmt.rsetId, pRSmaInfo->refId); } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 93d102dc5b..d9f38ffd09 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -287,22 +287,22 @@ int32_t tdRemoveTFile(STFile *pTFile) { } // smaXXXUtil ================ -void *tdAcquireSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { +void *tdAcquireSmaRef(int32_t rsetId, int64_t refId) { void *pResult = taosAcquireRef(rsetId, refId); if (!pResult) { - smaWarn("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + smaWarn("rsma acquire ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr()); } else { - smaDebug("%s:%d taosAcquireRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + smaDebug("rsma acquire ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); } return pResult; } -int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId, const char *tags, int32_t ln) { +int32_t tdReleaseSmaRef(int32_t rsetId, int64_t refId) { if (taosReleaseRef(rsetId, refId) < 0) { - smaWarn("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d failed since %s", tags, ln, rsetId, refId, terrstr()); + smaWarn("rsma release ref for rsetId:%" PRIi64 " refId:%d failed since %s", rsetId, refId, terrstr()); return TSDB_CODE_FAILED; } - smaDebug("%s:%d taosReleaseRef for rsetId:%" PRIi64 " refId:%d success", tags, ln, rsetId, refId); + smaDebug("rsma release ref for rsetId:%" PRIi64 " refId:%d success", rsetId, refId); return TSDB_CODE_SUCCESS; } @@ -313,7 +313,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t char *pOutput = NULL; int32_t len = 0; - if (qSerializeTaskStatus(srcTaskInfo, &pOutput, &len) < 0) { + if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, terrstr()); goto _err; @@ -337,13 +337,15 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t goto _err; } - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); + smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); taosMemoryFreeClear(pOutput); return TSDB_CODE_SUCCESS; _err: taosMemoryFreeClear(pOutput); tdFreeQTaskInfo(dstTaskInfo, TD_VID(pVnode), idx + 1); + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); return TSDB_CODE_FAILED; } @@ -376,19 +378,21 @@ int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo **pDest, SRSmaInfo *pSrc) { if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { - tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i); + if (tdCloneQTaskInfo(pSma, pSrc->iTaskInfo[i], pSrc->taskInfo[i], param, pSrc->suid, i) < 0) { + goto _err; + } } smaDebug("vgId:%d, rsma clone env success for %" PRIi64, TD_VID(pVnode), pSrc->suid); } metaReaderClear(&mr); - - *pDest = pSrc; // pointer copy - + + *pDest = pSrc; // pointer copy + return TSDB_CODE_SUCCESS; _err: *pDest = NULL; metaReaderClear(&mr); - // tdFreeRSmaInfo(pSma, pDest, false); + smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", TD_VID(pVnode), pSrc->suid, terrstr()); return TSDB_CODE_FAILED; } \ No newline at end of file