From b0598237adcb824e13c86af493563f54a0c29336 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 26 Dec 2022 15:23:44 +0800 Subject: [PATCH 1/5] enh: sma assert/log optimization --- source/dnode/vnode/src/sma/smaCommit.c | 94 +++-------- source/dnode/vnode/src/sma/smaEnv.c | 50 ++++-- source/dnode/vnode/src/sma/smaFS.c | 88 +--------- source/dnode/vnode/src/sma/smaOpen.c | 47 +++--- source/dnode/vnode/src/sma/smaRollup.c | 216 ++++++++++++++----------- source/libs/tfs/src/tfs.c | 4 +- 6 files changed, 216 insertions(+), 283 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 8b43de7421..9e4373d24f 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -116,68 +116,6 @@ _exit: return code; } -// SQTaskFile ====================================================== - -/** - * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be - * multiple qtaskinfo files supporting snapshot replication. - * - * @param pSma - * @param pStat - * @return int32_t - */ -static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { -#if 0 - SVnode *pVnode = pSma->pVnode; - SRSmaFS *pFS = RSMA_FS(pStat); - int64_t committed = pStat->commitAppliedVer; - int64_t fsMaxVer = -1; - char qTaskInfoFullName[TSDB_FILENAME_LEN]; - - taosWLockLatch(RSMA_FS_LOCK(pStat)); - - for (int32_t i = 0; i < taosArrayGetSize(pFS->aQTaskInf);) { - SQTaskFile *pTaskF = taosArrayGet(pFS->aQTaskInf, i); - int32_t oldVal = atomic_fetch_sub_32(&pTaskF->nRef, 1); - if ((oldVal <= 1) && (pTaskF->version < committed)) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->version, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - if (taosRemoveFile(qTaskInfoFullName) < 0) { - smaWarn("vgId:%d, cleanup qinf, committed %" PRIi64 ", failed to remove %s since %s", TD_VID(pVnode), committed, - qTaskInfoFullName, tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, cleanup qinf, committed %" PRIi64 ", success to remove %s", TD_VID(pVnode), committed, - qTaskInfoFullName); - } - taosArrayRemove(pFS->aQTaskInf, i); - continue; - } - ++i; - } - - if (taosArrayGetSize(pFS->aQTaskInf) > 0) { - fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version; - } - - if (fsMaxVer < committed) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), committed, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); - if (taosCheckExistFile(qTaskInfoFullName)) { - SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0}; - if (!taosArrayPush(pFS->aQTaskInf, &qFile)) { - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } - } - } else { - smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode), - committed, fsMaxVer); - } - - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); -#endif - return TSDB_CODE_SUCCESS; -} - /** * @brief Rsma async commit implementation(only do some necessary light weighted task) * 1) set rsma stat TASK_TRIGGER_STAT_PAUSED @@ -187,7 +125,8 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) { * @return int32_t */ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { - int32_t code = 0; + int32_t code = 0; + int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); if (!pEnv) { @@ -208,7 +147,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - // ASSERT(pRSmaStat->commitAppliedVer > 0); + if (pRSmaStat->commitAppliedVer < 0) { + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); + } // step 2: wait for all triggered fetch tasks to finish nLoops = 0; @@ -242,9 +184,9 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - if ((code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat))) != 0) { - return code; - } + code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + TSDB_CHECK_CODE(code, lino, _exit); + smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone @@ -252,8 +194,6 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // lock taosWLockLatch(SMA_ENV_LOCK(pEnv)); - ASSERT(RSMA_INFO_HASH(pRSmaStat)); - void *pIter = taosHashIterate(RSMA_INFO_HASH(pRSmaStat), NULL); while (pIter) { @@ -271,9 +211,19 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { // all rsma results are written completely STsdb *pTsdb = NULL; - if ((pTsdb = VND_RSMA1(pSma->pVnode))) tsdbPrepareCommit(pTsdb); - if ((pTsdb = VND_RSMA2(pSma->pVnode))) tsdbPrepareCommit(pTsdb); + if ((pTsdb = VND_RSMA1(pSma->pVnode))) { + code = tsdbPrepareCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } + if ((pTsdb = VND_RSMA2(pSma->pVnode))) { + code = tsdbPrepareCommit(pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } return code; } @@ -360,8 +310,6 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) { taosWUnLockLatch(SMA_ENV_LOCK(pEnv)); } - tdUpdateQTaskInfoFiles(pSma, pRSmaStat); - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e613b5833f..7372361103 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -131,7 +131,8 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv) : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv); - if (tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma) != TSDB_CODE_SUCCESS) { + terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma); + if (terrno != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); *ppEnv = NULL; (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL) @@ -193,10 +194,16 @@ static void tRSmaInfoHashFreeNode(void *data) { } static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pSma) { - ASSERT(pSmaStat != NULL); + int32_t code = 0; + int32_t lino = 0; + + if (!pSmaStat) { + terrno = TSDB_CODE_RSMA_INVALID_ENV; + TSDB_CHECK_CODE(code, lino, _exit); + } if (*pSmaStat) { // no lock - return TSDB_CODE_SUCCESS; + return code; // success, return directly } /** @@ -207,8 +214,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (!(*pSmaStat)) { *pSmaStat = (SSmaStat *)taosMemoryCalloc(1, sizeof(SSmaStat) + sizeof(TdThread) * tsNumOfVnodeRsmaThreads); if (!(*pSmaStat)) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } if (smaType == TSDB_SMA_TYPE_ROLLUP) { @@ -224,7 +231,8 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS if (refId < 0) { smaError("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d failed since:%s", SMA_VID(pSma), refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM, tstrerror(terrno)); - return TSDB_CODE_FAILED; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } else { smaDebug("vgId:%d, taosAddRef refId:%" PRIi64 " to rsetId rsetId:%d max:%d succeed", SMA_VID(pSma), refId, smaMgmt.rsetId, SMA_MGMT_REF_NUM); @@ -235,22 +243,29 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS RSMA_INFO_HASH(pRSmaStat) = taosHashInit( RSMA_TASK_INFO_HASH_SLOT, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); if (!RSMA_INFO_HASH(pRSmaStat)) { - return TSDB_CODE_FAILED; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } taosHashSetFreeFp(RSMA_INFO_HASH(pRSmaStat), tRSmaInfoHashFreeNode); if (tdRsmaStartExecutor(pSma) < 0) { - return TSDB_CODE_FAILED; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } taosInitRWLatch(RSMA_FS_LOCK(pRSmaStat)); } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { - ASSERT(0); + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); } } - return TSDB_CODE_SUCCESS; +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } + return code; } static void tdDestroyTSmaStat(STSmaStat *pStat) { @@ -339,7 +354,9 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId); } } else { - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; + smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr()); + return -1; } } return 0; @@ -348,7 +365,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { int32_t tdLockSma(SSma *pSma) { int code = taosThreadMutexLock(&pSma->mutex); if (code != 0) { - smaError("vgId:%d, failed to lock td since %s", SMA_VID(pSma), strerror(errno)); + smaError("vgId:%d, failed to lock since %s", SMA_VID(pSma), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); return -1; } @@ -357,12 +374,17 @@ int32_t tdLockSma(SSma *pSma) { } int32_t tdUnLockSma(SSma *pSma) { - ASSERT(SMA_LOCKED(pSma)); + if (!SMA_LOCKED(pSma)) { + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno)); + return -1; + } + pSma->locked = false; int code = taosThreadMutexUnlock(&pSma->mutex); if (code != 0) { - smaError("vgId:%d, failed to unlock td since %s", SMA_VID(pSma), strerror(errno)); terrno = TAOS_SYSTEM_ERROR(code); + smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), strerror(errno)); return -1; } return 0; diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index b7c2538b41..db2818cd92 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -77,7 +77,7 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { int32_t nt = tdRSmaGetQTaskF(pData + n, &qTaskF); if (nt < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_FILE_CORRUPTED; goto _exit; } @@ -88,7 +88,10 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { } } - ASSERT(n + sizeof(TSCKSUM) == nData); + if(n + sizeof(TSCKSUM) != nData) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _exit; + } _exit: return code; @@ -545,87 +548,6 @@ int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, _exit: return code; } -#if 0 -int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; - SQTaskFile *pTaskF = NULL; - int32_t oldVal = 0; - - taosRLockLatch(RSMA_FS_LOCK(pStat)); - if (suid > 0 && level > 0) { - ASSERT(version > 0); - if ((pTaskF = taosArraySearch(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ))) { - oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); - ASSERT(oldVal > 0); - } - } else { - // ref all - int32_t size = taosArrayGetSize(aQTaskInf); - for (int32_t i = 0; i < size; ++i) { - pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); - oldVal = atomic_fetch_add_32(&pTaskF->nRef, 1); - ASSERT(oldVal > 0); - } - } - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - return oldVal; -} - -void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t suid, int8_t level, int64_t version) { - SVnode *pVnode = pSma->pVnode; - SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf; - char qTaskFullName[TSDB_FILENAME_LEN]; - SQTaskFile qTaskF = {.level = level, .suid = suid, .version = version}; - SQTaskFile *pTaskF = NULL; - int32_t idx = -1; - - taosWLockLatch(RSMA_FS_LOCK(pStat)); - if (suid > 0 && level > 0) { - ASSERT(version > 0); - if ((idx = taosArraySearchIdx(aQTaskInf, &qTaskF, tdQTaskInfCmprFn1, TD_EQ)) >= 0) { - ASSERT(idx < taosArrayGetSize(aQTaskInf)); - pTaskF = taosArrayGet(aQTaskInf, idx); - if (atomic_sub_fetch_32(&pTaskF->nRef, 1) <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, level, pTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); - if (taosRemoveFile(qTaskFullName) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); - } - taosArrayRemove(aQTaskInf, idx); - } - } - } else { - for (int32_t i = 0; i < taosArrayGetSize(aQTaskInf);) { - pTaskF = TARRAY_GET_ELEM(aQTaskInf, i); - int32_t nRef = INT32_MAX; - if (pTaskF->version == version) { - nRef = atomic_sub_fetch_32(&pTaskF->nRef, 1); - } else if (pTaskF->version < version) { - nRef = atomic_load_32(&pTaskF->nRef); - } - if (nRef <= 0) { - tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, - tfsGetPrimaryPath(pVnode->pTfs), qTaskFullName); - if (taosRemoveFile(qTaskFullName) < 0) { - smaWarn("vgId:%d, failed to remove %s since %s", TD_VID(pVnode), qTaskFullName, - tstrerror(TAOS_SYSTEM_ERROR(errno))); - } else { - smaDebug("vgId:%d, success to remove %s", TD_VID(pVnode), qTaskFullName); - } - taosArrayRemove(aQTaskInf, i); - continue; - } - ++i; - } - } - - taosWUnLockLatch(RSMA_FS_LOCK(pStat)); -} -#endif int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS) { int32_t code = 0; diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index 3923a5dd5b..e82b65412f 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -34,13 +34,16 @@ static int32_t rsmaRestore(SSma *pSma); SRetention *r = (SRetention *)VND_RETENTIONS(v) + l; \ if (!RETENTION_VALID(r)) { \ if (l == 0) { \ - goto _err; \ + code = TSDB_CODE_INVALID_PARA; \ + TSDB_CHECK_CODE(code, lino, _exit); \ } \ break; \ } \ - smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ + code = smaSetKeepCfg(v, &keepCfg, pCfg, TSDB_TYPE_RSMA_L##l); \ + TSDB_CHECK_CODE(code, lino, _exit); \ if (tsdbOpen(v, &SMA_RSMA_TSDB##l(pSma), VNODE_RSMA##l##_DIR, &keepCfg, rollback) < 0) { \ - goto _err; \ + code = terrno; \ + TSDB_CHECK_CODE(code, lino, _exit); \ } \ } while (0) @@ -68,12 +71,10 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p days = keepDuration; } - if (level == TSDB_RETENTION_L0) { - goto end; + if (level < TSDB_RETENTION_L1 || level > TSDB_RETENTION_L2) { + goto _exit; } - ASSERT(level >= TSDB_RETENTION_L1 && level <= TSDB_RETENTION_L2); - freqDuration = convertTimeFromPrecisionToUnit((r + level)->freq, precision, TIME_UNIT_MINUTE); keepDuration = convertTimeFromPrecisionToUnit((r + level)->keep, precision, TIME_UNIT_MINUTE); @@ -91,16 +92,17 @@ static int32_t smaEvalDays(SVnode *pVnode, SRetention *r, int8_t level, int8_t p if (days < freqDuration) { days = freqDuration; } -end: +_exit: smaInfo("vgId:%d, evaluated duration for level %d is %d, raw val:%d", TD_VID(pVnode), level + 1, days, duration); return days; } int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int type) { + terrno = 0; pKeepCfg->precision = pCfg->precision; switch (type) { case TSDB_TYPE_TSMA: - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; break; case TSDB_TYPE_RSMA_L0: SMA_SET_KEEP_CFG(pVnode, 0); @@ -112,19 +114,21 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty SMA_SET_KEEP_CFG(pVnode, 2); break; default: - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; break; } - return 0; + return terrno; } int32_t smaOpen(SVnode *pVnode, int8_t rollback) { + int32_t code = 0; + int32_t lino = 0; STsdbCfg *pCfg = &pVnode->config.tsdbCfg; SSma *pSma = taosMemoryCalloc(1, sizeof(SSma)); if (!pSma) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } pVnode->pSma = pSma; @@ -143,21 +147,24 @@ int32_t smaOpen(SVnode *pVnode, int8_t rollback) { } else if (i == TSDB_RETENTION_L2) { SMA_OPEN_RSMA_IMPL(pVnode, 2); } else { - terrno = TSDB_CODE_APP_ERROR; - smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), terrstr(), i); - goto _err; + code = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, sma open failed since %s, level:%d", TD_VID(pVnode), tstrerror(code), i); + TSDB_CHECK_CODE(code, lino, _exit); } } // restore the rsma if (tdRSmaRestore(pSma, RSMA_RESTORE_REBOOT, pVnode->state.committed, rollback) < 0) { - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } } - return 0; -_err: - return -1; +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); + } + return code; } int32_t smaClose(SSma *pSma) { diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 044e1d5bab..65f2116a2d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -129,11 +129,12 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { - ASSERT(*pStore == NULL); - *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); if (*pStore == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; + *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); + if (*pStore == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } } return TSDB_CODE_SUCCESS; } @@ -232,8 +233,6 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui return TSDB_CODE_SUCCESS; } - ASSERT(ppStore != NULL); - if (!(*ppStore)) { if (tdUidStoreInit(ppStore) < 0) { return TSDB_CODE_FAILED; @@ -299,8 +298,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } - pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; - ASSERT(pItem->level > 0); + pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; // make sure: pItem->level > 0 SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); @@ -832,50 +830,51 @@ static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t srcTaskInfo, SRSmaParam *param, tb_uid_t suid, int8_t idx) { + int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; char *pOutput = NULL; int32_t len = 0; if (!srcTaskInfo) { - terrno = TSDB_CODE_INVALID_PTR; + code = TSDB_CODE_INVALID_PTR; smaWarn("vgId:%d, rsma clone, table %" PRIi64 ", no need since srcTaskInfo is NULL", TD_VID(pVnode), suid); - return TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _exit); } - if ((terrno = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len)) < 0) { - smaError("vgId:%d, rsma clone, table %" PRIi64 " serialize qTaskInfo failed since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; - } + code = qSerializeTaskStatus(srcTaskInfo, &pOutput, &len); + TSDB_CHECK_CODE(code, lino, _exit); SReadHandle handle = { .meta = pVnode->pMeta, .vnode = pVnode, .initTqReader = 1, }; - ASSERT(!dstTaskInfo); - dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); - if (!dstTaskInfo) { - terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; - goto _err; + + if (dstTaskInfo) { + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); } - if (qDeserializeTaskStatus(dstTaskInfo, pOutput, len) < 0) { - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; + dstTaskInfo = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle); + if (!dstTaskInfo) { + code = TSDB_CODE_RSMA_QTASKINFO_CREATE; + TSDB_CHECK_CODE(code, lino, _exit); } + code = qDeserializeTaskStatus(dstTaskInfo, pOutput, len); + TSDB_CHECK_CODE(code, lino, _exit); + smaDebug("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " succeed", TD_VID(pVnode), suid); +_exit: taosMemoryFreeClear(pOutput); - return TSDB_CODE_SUCCESS; -_err: - taosMemoryFreeClear(pOutput); - tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1); - smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, - terrstr()); - return TSDB_CODE_FAILED; + if (code) { + tdRSmaQTaskInfoFree(dstTaskInfo, TD_VID(pVnode), idx + 1); + smaError("vgId:%d, rsma clone, restore rsma task for table:%" PRIi64 " failed since %s", TD_VID(pVnode), suid, + terrstr()); + } + return code; } /** @@ -886,43 +885,51 @@ _err: * @return int32_t */ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { + int32_t code = 0; + int32_t lino = 0; SRSmaParam *param = NULL; + SMetaReader mr = {0}; + if (!pInfo) { return TSDB_CODE_SUCCESS; } - SMetaReader mr = {0}; metaReaderInit(&mr, SMA_META(pSma), 0); smaDebug("vgId:%d, rsma clone qTaskInfo for suid:%" PRIi64, SMA_VID(pSma), pInfo->suid); if (metaGetTableEntryByUidCache(&mr, pInfo->suid) < 0) { - smaError("vgId:%d, rsma clone, failed to get table meta for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, - terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (mr.me.type != TSDB_SUPER_TABLE) { + code = TSDB_CODE_RSMA_INVALID_SCHEMA; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (mr.me.uid != pInfo->suid) { + code = TSDB_CODE_RSMA_INVALID_SCHEMA; + TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == pInfo->suid); if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (!pInfo->iTaskInfo[i]) { continue; } - if (tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i) < 0) { - goto _err; - } + code = tdCloneQTaskInfo(pSma, pInfo->taskInfo[i], pInfo->iTaskInfo[i], param, pInfo->suid, i); + TSDB_CHECK_CODE(code, lino, _exit); } smaDebug("vgId:%d, rsma clone env success for %" PRIi64, SMA_VID(pSma), pInfo->suid); } else { - terrno = TSDB_CODE_RSMA_INVALID_SCHEMA; - goto _err; + code = TSDB_CODE_RSMA_INVALID_SCHEMA; + TSDB_CHECK_CODE(code, lino, _exit); } +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, SMA_VID(pSma), + __func__, lino, tstrerror(code), pInfo->suid, mr.me.type, mr.me.uid); + } metaReaderClear(&mr); - return TSDB_CODE_SUCCESS; -_err: - metaReaderClear(&mr); - smaError("vgId:%d, rsma clone env failed for %" PRIi64 " since %s", SMA_VID(pSma), pInfo->suid, terrstr()); - return TSDB_CODE_FAILED; + return code; } /** @@ -933,10 +940,14 @@ _err: * @return SRSmaInfo* */ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { + int32_t code = 0; + int32_t lino = 0; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pStat = NULL; SRSmaInfo *pRSmaInfo = NULL; + terrno = 0; + if (!pEnv) { terrno = TSDB_CODE_RSMA_INVALID_ENV; return NULL; @@ -956,14 +967,19 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { return NULL; } if (!pRSmaInfo->taskInfo[0]) { - if (tdRSmaInfoClone(pSma, pRSmaInfo) < 0) { + if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - ASSERT(pRSmaInfo->suid == suid); + if (pRSmaInfo->suid != suid) { + terrno = TSDB_CODE_APP_ERROR; + smaError("vgId:%d, invalid rsma info in cache, suid:%" PRIi64 " != %" PRIi64, SMA_VID(pSma), pRSmaInfo->suid, + suid); + return NULL; + } return pRSmaInfo; } taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); @@ -1009,7 +1025,11 @@ static int32_t tdExecuteRSmaAsync(SSma *pSma, const void *pMsg, int32_t inputTyp } } } else { - ASSERT(0); + terrno = TSDB_CODE_APP_ERROR; + tdReleaseRSmaInfo(pSma, pRSmaInfo); + smaError("vgId:%d, execute rsma, failed for suid:%" PRIu64 " since %s, type:%d", SMA_VID(pSma), suid, + tstrerror(terrno), inputType); + return TSDB_CODE_FAILED; } tdReleaseRSmaInfo(pSma, pRSmaInfo); @@ -1060,19 +1080,22 @@ _err: * @return int32_t */ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { + int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; SArray *suidList = NULL; STbUidStore uidStore = {0}; SMetaReader mr = {0}; + tb_uid_t suid = 0; if (!(suidList = taosArrayInit(1, sizeof(tb_uid_t)))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } if (vnodeGetStbIdList(pSma->pVnode, 0, suidList) < 0) { - smaError("vgId:%d, failed to restore rsma env since get stb id list error: %s", TD_VID(pVnode), terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } int64_t arrSize = taosArrayGetSize(suidList); @@ -1089,19 +1112,26 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { int64_t nRsmaTables = 0; metaReaderInit(&mr, SMA_META(pSma), 0); if (!(uidStore.tbUids = taosArrayInit(1024, sizeof(tb_uid_t)))) { - goto _err; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } + for (int64_t i = 0; i < arrSize; ++i) { - tb_uid_t suid = *(tb_uid_t *)taosArrayGet(suidList, i); + suid = *(tb_uid_t *)taosArrayGet(suidList, i); smaDebug("vgId:%d, rsma restore, suid is %" PRIi64, TD_VID(pVnode), suid); if (metaGetTableEntryByUidCache(&mr, suid) < 0) { - smaError("vgId:%d, rsma restore, failed to get table meta for %" PRIi64 " since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } tDecoderClear(&mr.coder); - ASSERT(mr.me.type == TSDB_SUPER_TABLE); - ASSERT(mr.me.uid == suid); + if (mr.me.type != TSDB_SUPER_TABLE) { + code = TSDB_CODE_RSMA_INVALID_SCHEMA; + TSDB_CHECK_CODE(code, lino, _exit); + } + if (mr.me.uid != suid) { + code = TSDB_CODE_RSMA_INVALID_SCHEMA; + TSDB_CHECK_CODE(code, lino, _exit); + } if (TABLE_IS_ROLLUP(mr.me.flags)) { ++nRsmaTables; SRSmaParam *param = &mr.me.stbEntry.rsmaParam; @@ -1111,22 +1141,20 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { TD_VID(pVnode), suid, i, param->maxdelay[i], param->watermark[i], param->qmsgLen[i]); } if (tdRSmaProcessCreateImpl(pSma, &mr.me.stbEntry.rsmaParam, suid, mr.me.name) < 0) { - smaError("vgId:%d, rsma restore env failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } // reload all ctbUids for suid uidStore.suid = suid; if (vnodeGetCtbIdList(pVnode, suid, uidStore.tbUids) < 0) { - smaError("vgId:%d, rsma restore, get ctb idlist failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) { - smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, - terrstr()); - goto _err; + code = terrno; + TSDB_CHECK_CODE(code, lino, _exit); } taosArrayClear(uidStore.tbUids); @@ -1135,21 +1163,18 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { } } - metaReaderClear(&mr); - taosArrayDestroy(suidList); - tdUidStoreDestory(&uidStore); - if (nTables) { *nTables = nRsmaTables; } - - return TSDB_CODE_SUCCESS; -_err: +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, TD_VID(pVnode), + __func__, lino, tstrerror(code), suid, mr.me.type, mr.me.uid); + } metaReaderClear(&mr); taosArrayDestroy(suidList); tdUidStoreDestory(&uidStore); - - return TSDB_CODE_FAILED; + return code; } /** @@ -1251,7 +1276,6 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { code = TAOS_SYSTEM_ERROR(errno); TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(size > 0); int64_t offset = 0; if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) { @@ -1371,10 +1395,11 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { // async process pItem->fetchLevel = pItem->level; #if 0 + // debugging codes SRSmaInfo *qInfo = tdAcquireRSmaInfoBySuid(pSma, pRSmaInfo->suid); SRSmaInfoItem *qItem = RSMA_INFO_ITEM(qInfo, pItem->level - 1); - ASSERT(qItem->level == pItem->level); - ASSERT(qItem->fetchLevel == pItem->fetchLevel); + make sure(qItem->level == pItem->level); + make sure(qItem->fetchLevel == pItem->fetchLevel); #endif if (atomic_load_8(&pRSmaInfo->assigned) == 0) { tsem_post(&(pStat->notEmpty)); @@ -1516,6 +1541,8 @@ _err: */ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { + int32_t code = 0; + int32_t lino = 0; SVnode *pVnode = pSma->pVnode; SSmaEnv *pEnv = SMA_RSMA_ENV(pSma); SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv); @@ -1524,14 +1551,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { bool isFetchAll = false; if (!pRSmaStat || !(infoHash = RSMA_INFO_HASH(pRSmaStat))) { - terrno = TSDB_CODE_RSMA_INVALID_STAT; - goto _err; + code = TSDB_CODE_RSMA_INVALID_STAT; + TSDB_CHECK_CODE(code, lino, _exit); } if (!(pSubmitArr = taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } while (true) { @@ -1562,7 +1589,10 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (oldStat == 0 || ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); - ASSERT(oldVal >= 0); + if (oldVal < 0) { + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); + } int8_t curStat = atomic_load_8(RSMA_COMMIT_STAT(pRSmaStat)); if (curStat == 1) { @@ -1592,7 +1622,8 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } } } else { - ASSERT(0); + code = TSDB_CODE_APP_ERROR; + TSDB_CHECK_CODE(code, lino, _exit); } if (atomic_load_64(&pRSmaStat->nBufItems) <= 0) { @@ -1611,10 +1642,11 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } // end of while(true) -_end: +_exit: taosArrayDestroy(pSubmitArr); - return TSDB_CODE_SUCCESS; -_err: - taosArrayDestroy(pSubmitArr); - return TSDB_CODE_FAILED; + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, type:%d", TD_VID(pVnode), __func__, lino, tstrerror(code), + (int32_t)type); + } + return code; } diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 943611ee27..9a71fc7f30 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -284,7 +284,9 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { } int32_t tfsRmdir(STfs *pTfs, const char *rname) { - ASSERT(rname[0] != 0); + if (rname[0] == 0) { + return 0; + } char aname[TMPNAME_LEN] = "\0"; From 8c6120db9218b6567e2375f00e3073fbb5e19f36 Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 2 Jan 2023 18:24:47 +0800 Subject: [PATCH 2/5] enh: rsma asserts process --- source/dnode/vnode/src/sma/smaCommit.c | 3 +- source/dnode/vnode/src/sma/smaEnv.c | 9 +++--- source/dnode/vnode/src/sma/smaFS.c | 3 +- source/dnode/vnode/src/sma/smaOpen.c | 2 ++ source/dnode/vnode/src/sma/smaRollup.c | 42 ++++++++++++++++---------- 5 files changed, 37 insertions(+), 22 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 9e4373d24f..eb01470f36 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -147,7 +147,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - if (pRSmaStat->commitAppliedVer < 0) { + if (ASSERTS(pRSmaStat->commitAppliedVer > 0, "commit applied version %" PRIi64 " <= 0", + pRSmaStat->commitAppliedVer)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index 7372361103..a980b653a9 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -131,8 +131,7 @@ static int32_t tdNewSmaEnv(SSma *pSma, int8_t smaType, SSmaEnv **ppEnv) { (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), *ppEnv) : atomic_store_ptr(&SMA_RSMA_ENV(pSma), *ppEnv); - terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma); - if (terrno != TSDB_CODE_SUCCESS) { + if ((terrno = tdInitSmaStat(&SMA_ENV_STAT(pEnv), smaType, pSma)) != TSDB_CODE_SUCCESS) { tdFreeSmaEnv(pEnv); *ppEnv = NULL; (smaType == TSDB_SMA_TYPE_TIME_RANGE) ? atomic_store_ptr(&SMA_TSMA_ENV(pSma), NULL) @@ -197,7 +196,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS int32_t code = 0; int32_t lino = 0; - if (!pSmaStat) { + if (ASSERTS(pSmaStat != NULL, "pSmaStat is NULL")) { terrno = TSDB_CODE_RSMA_INVALID_ENV; TSDB_CHECK_CODE(code, lino, _exit); } @@ -257,6 +256,7 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS } else if (smaType == TSDB_SMA_TYPE_TIME_RANGE) { // TODO } else { + ASSERTS(0, "unknown smaType:%" PRIi8, smaType); code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } @@ -354,6 +354,7 @@ static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType) { smaDebug("vgId:%d, remove refId:%" PRIi64 " from rsmaRef:%" PRIi32 " succeed", vid, refId, smaMgmt.rsetId); } } else { + ASSERTS(0, "unknown smaType:%" PRIi8, smaType); terrno = TSDB_CODE_APP_ERROR; smaError("%s failed at line %d since %s", __func__, __LINE__, terrstr()); return -1; @@ -374,7 +375,7 @@ int32_t tdLockSma(SSma *pSma) { } int32_t tdUnLockSma(SSma *pSma) { - if (!SMA_LOCKED(pSma)) { + if (ASSERTS(SMA_LOCKED(pSma), "pSma %p is not locked:%d", pSma, pSma->locked)) { terrno = TSDB_CODE_APP_ERROR; smaError("vgId:%d, failed to unlock since %s", SMA_VID(pSma), tstrerror(terrno)); return -1; diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index db2818cd92..ef872d055e 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -88,7 +88,8 @@ static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, SRSmaFS *pFS) { } } - if(n + sizeof(TSCKSUM) != nData) { + if (ASSERTS(n + sizeof(TSCKSUM) == nData, "n:%d + sizeof(TSCKSUM):%d != nData:%d", n, (int32_t)sizeof(TSCKSUM), + nData)) { code = TSDB_CODE_FILE_CORRUPTED; goto _exit; } diff --git a/source/dnode/vnode/src/sma/smaOpen.c b/source/dnode/vnode/src/sma/smaOpen.c index e82b65412f..ed33e0fd7b 100644 --- a/source/dnode/vnode/src/sma/smaOpen.c +++ b/source/dnode/vnode/src/sma/smaOpen.c @@ -102,6 +102,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty pKeepCfg->precision = pCfg->precision; switch (type) { case TSDB_TYPE_TSMA: + ASSERTS(0, "undefined smaType:%d", (int32_t)type); terrno = TSDB_CODE_APP_ERROR; break; case TSDB_TYPE_RSMA_L0: @@ -114,6 +115,7 @@ int smaSetKeepCfg(SVnode *pVnode, STsdbKeepCfg *pKeepCfg, STsdbCfg *pCfg, int ty SMA_SET_KEEP_CFG(pVnode, 2); break; default: + ASSERTS(0, "unknown smaType:%d", (int32_t)type); terrno = TSDB_CODE_APP_ERROR; break; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 81f2285d03..bfa681756d 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -129,13 +129,17 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { - if (*pStore == NULL) { - *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); - if (*pStore == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return TSDB_CODE_FAILED; - } + if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) { + terrno = TSDB_CODE_APP_ERROR; + return TSDB_CODE_FAILED; } + + *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); + if (*pStore == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; } @@ -298,7 +302,12 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->maxDelay = TSDB_MAX_ROLLUP_MAX_DELAY; } - pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; // make sure: pItem->level > 0 + pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; + + if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) { + terrno = TSDB_CODE_APP_ERROR; + return TSDB_CODE_FAILED; + } SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); @@ -850,7 +859,7 @@ static int32_t tdCloneQTaskInfo(SSma *pSma, qTaskInfo_t dstTaskInfo, qTaskInfo_t .initTqReader = 1, }; - if (dstTaskInfo) { + if (ASSERTS(!dstTaskInfo, "dstTaskInfo:%p is not NULL", dstTaskInfo)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } @@ -899,6 +908,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { code = terrno; TSDB_CHECK_CODE(code, lino, _exit); } + if (mr.me.type != TSDB_SUPER_TABLE) { code = TSDB_CODE_RSMA_INVALID_SCHEMA; TSDB_CHECK_CODE(code, lino, _exit); @@ -907,6 +917,7 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { code = TSDB_CODE_RSMA_INVALID_SCHEMA; TSDB_CHECK_CODE(code, lino, _exit); } + if (TABLE_IS_ROLLUP(mr.me.flags)) { param = &mr.me.stbEntry.rsmaParam; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { @@ -924,8 +935,8 @@ static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { _exit: if (code) { - smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", type:%" PRIi8 ", uid:%" PRIi64, SMA_VID(pSma), - __func__, lino, tstrerror(code), pInfo->suid, mr.me.type, mr.me.uid); + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", flags:%" PRIi8 ",type:%" PRIi8 ", uid:%" PRIi64, + SMA_VID(pSma), __func__, lino, tstrerror(code), pInfo->suid, mr.me.flags, mr.me.type, mr.me.uid); } metaReaderClear(&mr); return code; @@ -973,10 +984,8 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { } tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); - if (pRSmaInfo->suid != suid) { + if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { terrno = TSDB_CODE_APP_ERROR; - smaError("vgId:%d, invalid rsma info in cache, suid:%" PRIi64 " != %" PRIi64, SMA_VID(pSma), pRSmaInfo->suid, - suid); return NULL; } return pRSmaInfo; @@ -1597,7 +1606,8 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { if (oldStat == 0 || ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { int32_t oldVal = atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); - if (oldVal < 0) { + + if (ASSERTS(oldVal >= 0, "oldVal of nFetchAll: %d < 0", oldVal)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } @@ -1630,6 +1640,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } } } else { + ASSERTS(0, "unknown rsma exec type:%d", (int32_t)type); code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit); } @@ -1653,8 +1664,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { _exit: taosArrayDestroy(pSubmitArr); if (code) { - smaError("vgId:%d, %s failed at line %d since %s, type:%d", TD_VID(pVnode), __func__, lino, tstrerror(code), - (int32_t)type); + smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code)); } return code; } From 9a67de6ce8b053fc44b0d2e0c03e6be1d83e0f8c Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 2 Jan 2023 22:44:18 +0800 Subject: [PATCH 3/5] chore: rsma bug fix --- source/dnode/vnode/src/sma/smaRollup.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index bfa681756d..f6f6faf4bf 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -312,7 +312,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); - pItem->fetchLevel = pItem->level; taosTmrReset(tdRSmaFetchTrigger, RSMA_FETCH_INTERVAL, pItem, smaMgmt.tmrHandle, &pItem->tmrId); smaInfo("vgId:%d, item:%p table:%" PRIi64 " level:%" PRIi8 " maxdelay:%" PRIi64 " watermark:%" PRIi64 @@ -1573,7 +1572,7 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { } if (!(pSubmitArr = - taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), POINTER_BYTES))) { + taosArrayInit(TMIN(RSMA_SUBMIT_BATCH_SIZE, atomic_load_64(&pRSmaStat->nBufItems)), sizeof(SPackedData)))) { code = TSDB_CODE_OUT_OF_MEMORY; TSDB_CHECK_CODE(code, lino, _exit); } From fd44bf15b835015e970b43383e21f83b8cc04ccf Mon Sep 17 00:00:00 2001 From: kailixu Date: Mon, 2 Jan 2023 23:07:48 +0800 Subject: [PATCH 4/5] chore: rsma code optimization --- source/dnode/vnode/src/sma/smaRollup.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index f6f6faf4bf..809e553ab6 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -168,15 +168,14 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) { + if ((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); return TSDB_CODE_FAILED; - } else { - smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d", - SMA_VID(pSma), pRSmaInfo->taskInfo[0], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i); } + smaDebug("vgId:%d, update tbUidList succeed for qTaskInfo:%p with suid:%" PRIi64 " uid:%" PRIi64 " level %d", + SMA_VID(pSma), pRSmaInfo->taskInfo[i], *suid, *(int64_t *)taosArrayGet(tbUids, 0), i); } } From 8ab9acee4cb48b8f8b6c7b3a869fca58913d0eca Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 3 Jan 2023 23:36:17 +0800 Subject: [PATCH 5/5] chore: more code --- source/dnode/vnode/src/sma/smaCommit.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index eb01470f36..0aa20e9e1d 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -147,7 +147,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } } pRSmaStat->commitAppliedVer = pSma->pVnode->state.applied; - if (ASSERTS(pRSmaStat->commitAppliedVer > 0, "commit applied version %" PRIi64 " <= 0", + if (ASSERTS(pRSmaStat->commitAppliedVer >= 0, "commit applied version %" PRIi64 " < 0", pRSmaStat->commitAppliedVer)) { code = TSDB_CODE_APP_ERROR; TSDB_CHECK_CODE(code, lino, _exit);