diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 80630c1358..445b6364f7 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -755,6 +755,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RSMA_FS_REF TAOS_DEF_ERROR_CODE(0, 0x3161) #define TSDB_CODE_RSMA_FS_SYNC TAOS_DEF_ERROR_CODE(0, 0x3162) #define TSDB_CODE_RSMA_FS_UPDATE TAOS_DEF_ERROR_CODE(0, 0x3163) +#define TSDB_CODE_RSMA_RESULT TAOS_DEF_ERROR_CODE(0, 0x3164) //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index a3b09ba7fa..a680ebddb8 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -30,6 +30,8 @@ SSmaMgmt smaMgmt = { typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; +extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now); + static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static void tdUidStoreDestory(STbUidStore *pStore); static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); @@ -44,7 +46,7 @@ static int32_t tdRSmaFetchAllResult(SSma *pSma, SRSmaInfo *pInfo); static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); +// static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo); static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables); static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int8_t type, int64_t qTaskFileVer); @@ -64,10 +66,7 @@ static void tdRSmaQTaskInfoFree(qTaskInfo_t *taskHandle, int32_t vgId, int32_t l if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { smaDebug("vgId:%d, free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); qDestroyTask(otaskHandle); - } else { - smaDebug("vgId:%d, not free qTaskInfo_t %p of level %d", vgId, otaskHandle, level); } - // TODO: clear files related to qTaskInfo? } /** @@ -95,16 +94,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); - } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty taskInfo", SMA_VID(pSma), - pInfo->suid, i + 1); } #if 0 if (pInfo->iTaskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->iTaskInfo[i], SMA_VID(pSma), i + 1); - } else { - smaDebug("vgId:%d, table %" PRIi64 " no need to destroy rsma info level %d since empty iTaskInfo", - SMA_VID(pSma), pInfo->suid, i + 1); } #endif } @@ -140,11 +133,6 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { } static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { - if (ASSERTS(*pStore == NULL, "*pStore:%p != NULL", *pStore)) { - terrno = TSDB_CODE_APP_ERROR; - return TSDB_CODE_FAILED; - } - *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); if (*pStore == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -314,11 +302,6 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pItem->level = idx == 0 ? TSDB_RETENTION_L1 : TSDB_RETENTION_L2; - if (ASSERTS(pItem->level > 0, "pItem level:%" PRIi8 " should > 0", pItem->level)) { - terrno = TSDB_CODE_APP_ERROR; - return TSDB_CODE_FAILED; - } - SRSmaRef rsmaRef = {.refId = pStat->refId, .suid = pRSmaInfo->suid}; taosHashPut(smaMgmt.refHash, &pItem, POINTER_BYTES, &rsmaRef, sizeof(rsmaRef)); @@ -380,13 +363,13 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con pRSmaInfo->pTSchema = pTSchema; pRSmaInfo->suid = suid; T_REF_INIT_VAL(pRSmaInfo, 1); - if (!(pRSmaInfo->queue = taosOpenQueue())) { + + if (!(pRSmaInfo->queue = taosOpenQueue()) || !(pRSmaInfo->qall = taosAllocateQall()) || + tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0 || + tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { goto _err; } - if (!(pRSmaInfo->qall = taosAllocateQall())) { - goto _err; - } #if 0 if (!(pRSmaInfo->iQueue = taosOpenQueue())) { goto _err; @@ -395,13 +378,6 @@ int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con goto _err; } #endif - if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 0) < 0) { - goto _err; - } - - if (tdSetRSmaInfoItemParams(pSma, param, pStat, pRSmaInfo, 1) < 0) { - goto _err; - } if (taosHashPut(RSMA_INFO_HASH(pStat), &suid, sizeof(tb_uid_t), &pRSmaInfo, sizeof(pRSmaInfo)) < 0) { goto _err; @@ -577,15 +553,12 @@ void *tdUidStoreFree(STbUidStore *pStore) { * @return int32_t */ static int32_t tdProcessSubmitReq(STsdb *pTsdb, int64_t version, void *pReq) { - if (!pReq) { - terrno = TSDB_CODE_INVALID_PTR; - return TSDB_CODE_FAILED; - } - - SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; - // spin lock for race condition during insert data - if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { - return TSDB_CODE_FAILED; + if (pReq) { + SSubmitReq2 *pSubmitReq = (SSubmitReq2 *)pReq; + // spin lock for race condition during insert data + if (tsdbInsertData(pTsdb, version, pSubmitReq, NULL) < 0) { + return TSDB_CODE_FAILED; + } } return TSDB_CODE_SUCCESS; @@ -607,7 +580,6 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { return 0; } -#if 0 /** * @brief retention of rsma1/rsma2 * @@ -631,48 +603,40 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { _end: return code; } -#endif +#if 0 static void tdBlockDataDestroy(SArray *pBlockArr) { for (int32_t i = 0; i < taosArrayGetSize(pBlockArr); ++i) { blockDataDestroy(taosArrayGetP(pBlockArr, i)); } taosArrayDestroy(pBlockArr); } +#endif static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid) { + int32_t code = 0; + int32_t lino = 0; + SSDataBlock *output = NULL; + SArray *pResList = taosArrayInit(1, POINTER_BYTES); if (pResList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _exit); } while (1) { uint64_t ts; bool hasMore = false; - int32_t code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); - if (code < 0) { - if (code == TSDB_CODE_QRY_IN_EXEC) { - break; - } else { - smaError("vgId:%d, qExecTask for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), suid, - pItem->level, terrstr(code)); - goto _err; - } + code = qExecTaskOpt(taskInfo, pResList, &ts, &hasMore, NULL); + if (code == TSDB_CODE_QRY_IN_EXEC) { + code = 0; + break; } + TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pResList) == 0) { - if (terrno == 0) { - // smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched yet", SMA_VID(pSma), pItem->level); - } else { - smaDebug("vgId:%d, no rsma level %" PRIi8 " data fetched since %s", SMA_VID(pSma), pItem->level, terrstr()); - goto _err; - } - break; - } else { - smaDebug("vgId:%d, rsma level %" PRIi8 " data fetched", SMA_VID(pSma), pItem->level); } #if 0 char flag[10] = {0}; @@ -680,28 +644,25 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma blockDebugShowDataBlocks(pResList, flag); #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { - SSDataBlock *output = taosArrayGetP(pResList, i); - smaDebug("result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%" PRId64, output->info.id.uid, - output->info.id.groupId, output->info.rows); + + output = taosArrayGetP(pResList, i); + smaDebug("vgId:%d, result block, uid:%" PRIu64 ", groupid:%" PRIu64 ", rows:%d", SMA_VID(pSma), + output->info.id.uid, output->info.id.groupId, output->info.rows); STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq2 *pReq = NULL; // TODO: the schema update should be handled later(TD-17965) if (buildSubmitReqFromDataBlock(&pReq, output, pTSchema, output->info.id.groupId, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma table suid:%" PRIu64 ", uid:%" PRIu64 ", level %" PRIi8 - " failed since %s", - SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); - goto _err; + code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; + TSDB_CHECK_CODE(code, lino, _exit); } if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { - tDestroySubmitReq(pReq, TSDB_MSG_FLG_ENCODE); + code = terrno ? terrno : TSDB_CODE_RSMA_RESULT; + tDestroySSubmitReq2(pReq, TSDB_MSG_FLG_ENCODE); taosMemoryFree(pReq); - smaError("vgId:%d, process submit req for rsma suid:%" PRIu64 ", uid:%" PRIu64 " level %" PRIi8 - " failed since %s", - SMA_VID(pSma), suid, output->info.id.groupId, pItem->level, terrstr()); - goto _err; + TSDB_CHECK_CODE(code, lino, _exit); } smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%" PRIu64 ", level %" PRIi8 " ver %" PRIi64, @@ -713,15 +674,18 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma } } } - +_exit: + if (code) { + smaError("vgId:%d, %s failed at line %d since %s, suid:%" PRIi64 ", level:%" PRIi8 ", uid:%" PRIi64 + ", ver:%" PRIi64, + SMA_VID(pSma), __func__, lino, tstrerror(code), suid, pItem->level, output ? output->info.id.uid : -1, + output ? output->info.version : -1); + } else { + smaDebug("vgId:%d, %s succeed, suid:%" PRIi64 ", level:%" PRIi8, SMA_VID(pSma), __func__, suid, pItem->level); + } taosArrayDestroy(pResList); qCleanExecTaskBlockBuf(taskInfo); - return TSDB_CODE_SUCCESS; - -_err: - taosArrayDestroy(pResList); - qCleanExecTaskBlockBuf(taskInfo); - return TSDB_CODE_FAILED; + return code; } /** @@ -910,6 +874,7 @@ _exit: * @param pInfo * @return int32_t */ +#if 0 static int32_t tdRSmaInfoClone(SSma *pSma, SRSmaInfo *pInfo) { int32_t code = 0; int32_t lino = 0; @@ -961,6 +926,7 @@ _exit: metaReaderClear(&mr); return code; } +#endif /** * @brief During async commit, the SRSmaInfo object would be COW from iRSmaInfoHash and write lock should be applied. @@ -996,12 +962,14 @@ static SRSmaInfo *tdAcquireRSmaInfoBySuid(SSma *pSma, int64_t suid) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } +#if 0 if (!pRSmaInfo->taskInfo[0]) { if ((terrno = tdRSmaInfoClone(pSma, pRSmaInfo)) < 0) { taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); return NULL; } } +#endif tdRefRSmaInfo(pSma, pRSmaInfo); taosRUnLockLatch(SMA_ENV_LOCK(pEnv)); if (ASSERTS(pRSmaInfo->suid == suid, "suid:%" PRIi64 " != %" PRIi64, pRSmaInfo->suid, suid)) { diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index 170deb4286..d108df5684 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -82,6 +82,9 @@ static int32_t vnodeRetentionTask(void *param) { code = tsdbDoRetention(pInfo->pVnode->pTsdb, pInfo->now); TSDB_CHECK_CODE(code, lino, _exit); + code = smaDoRetention(pInfo->pVnode->pSma, pInfo->now); + TSDB_CHECK_CODE(code, lino, _exit); + // commit info vnodeCommitInfo(dir); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6f726c7eff..41d8791858 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -624,6 +624,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_STREAM_STATE_COMMIT, "Rsma stream state c TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_REF, "Rsma fs ref error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_SYNC, "Rsma fs sync error") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_FS_UPDATE, "Rsma fs update error") +TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_RESULT, "Rsma result error") //index TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index 05dd4842f9..29440855a2 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -18,8 +18,7 @@ if $rows != 2 then endi print =============== create child table -sql create table ct1 using stb tags("BeiJing", "ChaoYang"); -sql create table ct_1 using stb1 tags("BeiJing", "ChaoYang"); +sql create table ct1 using stb tags("BeiJing", "ChaoYang") ct_1 using stb1 tags("BeiJing", "ChaoYang"); sql show tables if $rows != 2 then