From ac63ee40c95c95e1f2a458e0bd71ff8196eb219f Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 5 Aug 2022 17:44:11 +0800 Subject: [PATCH] fix: fix get tb meta for tb cfg case --- source/libs/catalog/inc/catalogInt.h | 6 +- source/libs/catalog/src/ctgAsync.c | 194 ++++++++++++++++++++++++--- source/libs/catalog/src/ctgUtil.c | 4 + 3 files changed, 178 insertions(+), 26 deletions(-) diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index d4b8a39197..c452f61b75 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -720,11 +720,7 @@ void ctgClearHandle(SCatalog* pCtg); void ctgFreeTbCacheImpl(SCtgTbCache *pCache); int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName); int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup); - -FORCE_INLINE SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) { - STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx); - return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx); -} +SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 37994aa97f..bbca85ed14 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -930,18 +930,10 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * SCatalog* pCtg = pTask->pJob->pCtg; SRequestConnInfo* pConn = &pTask->pJob->conn; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); -#if CTG_BATCH_FETCH - SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; - SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); - SName* pName = ctgGetFetchName(ctx->pNames, pFetch); - int32_t flag = pFetch->flag; - int32_t* vgId = &pFetch->vgId; -#else SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SName* pName = ctx->pName; int32_t flag = ctx->flag; int32_t* vgId = &ctx->vgId; -#endif CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); @@ -1080,17 +1072,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf * } */ -#if CTG_BATCH_FETCH - SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); - pRes->code = 0; - pRes->pRes = pOut->tbMeta; - pOut->tbMeta = NULL; - if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { - TSWAP(pTask->res, ctx->pResList); - } -#else TSWAP(pTask->res, pOut->tbMeta); -#endif _return: @@ -1098,7 +1080,177 @@ _return: ctgReleaseVgInfoToCache(pCtg, dbCache); } -#if CTG_BATCH_FETCH + if (pTask->res) { + ctgHandleTaskEnd(pTask, code); + } + + CTG_RET(code); +} + + +int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { + int32_t code = 0; + SCtgDBCache *dbCache = NULL; + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); + SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; + SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); + SName* pName = ctgGetFetchName(ctx->pNames, pFetch); + int32_t flag = pFetch->flag; + int32_t* vgId = &pFetch->vgId; + + CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); + + switch (reqType) { + case TDMT_MND_USE_DB: { + SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; + + SVgroupInfo vgInfo = {0}; + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo)); + + ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + + *vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); + + return TSDB_CODE_SUCCESS; + } + case TDMT_MND_TABLE_META: { + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; + + if (CTG_IS_META_NULL(pOut->metaType)) { + if (CTG_FLAG_IS_STB(flag)) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pName, dbFName); + + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); + if (NULL != dbCache) { + SVgroupInfo vgInfo = {0}; + CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo)); + + ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); + + *vgId = vgInfo.vgId; + CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); + + ctgReleaseVgInfoToCache(pCtg, dbCache); + } else { + SBuildUseDBInput input = {0}; + + tstrncpy(input.db, dbFName, tListLen(input.db)); + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + + CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); + } + + return TSDB_CODE_SUCCESS; + } + + ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName)); + ctgRemoveTbMetaFromCache(pCtg, pName, false); + + CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); + } + + if (pMsgCtx->lastOut) { + TSWAP(pMsgCtx->out, pMsgCtx->lastOut); + STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out; + TSWAP(pLastOut->tbMeta, pOut->tbMeta); + } + + break; + } + case TDMT_VND_TABLE_META: { + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; + + if (CTG_IS_META_NULL(pOut->metaType)) { + ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName)); + ctgRemoveTbMetaFromCache(pCtg, pName, false); + CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); + } + + if (CTG_FLAG_IS_STB(flag)) { + break; + } + + if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) { + ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName)); + + taosMemoryFreeClear(pOut->tbMeta); + + CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); + } else if (CTG_IS_META_BOTH(pOut->metaType)) { + int32_t exist = 0; + if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) { + SName stbName = *pName; + strcpy(stbName.tname, pOut->tbName); + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = flag; + stbCtx.pName = &stbName; + + taosMemoryFreeClear(pOut->tbMeta); + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); + if (pOut->tbMeta) { + exist = 1; + } + } + + if (0 == exist) { + TSWAP(pMsgCtx->lastOut, pMsgCtx->out); + CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask)); + } + } + break; + } + default: + ctgError("invalid reqType %d", reqType); + CTG_ERR_JRET(TSDB_CODE_INVALID_MSG); + break; + } + + STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out; + + ctgUpdateTbMetaToCache(pCtg, pOut, false); + + if (CTG_IS_META_BOTH(pOut->metaType)) { + memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); + } + +/* + else if (CTG_IS_META_CTABLE(pOut->metaType)) { + SName stbName = *pName; + strcpy(stbName.tname, pOut->tbName); + SCtgTbMetaCtx stbCtx = {0}; + stbCtx.flag = flag; + stbCtx.pName = &stbName; + + CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); + if (NULL == pOut->tbMeta) { + ctgDebug("stb no longer exist, stbName:%s", stbName.tname); + CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask)); + + return TSDB_CODE_SUCCESS; + } + + memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta)); + } +*/ + + SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); + pRes->code = 0; + pRes->pRes = pOut->tbMeta; + pOut->tbMeta = NULL; + if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { + TSWAP(pTask->res, ctx->pResList); + } + +_return: + + if (dbCache) { + ctgReleaseVgInfoToCache(pCtg, dbCache); + } + if (code) { SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = code; @@ -1107,7 +1259,6 @@ _return: TSWAP(pTask->res, ctx->pResList); } } -#endif if (pTask->res) { ctgHandleTaskEnd(pTask, code); @@ -1116,6 +1267,7 @@ _return: CTG_RET(code); } + int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t code = 0; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; @@ -1896,7 +2048,7 @@ SCtgAsyncFps gCtgAsyncFps[] = { {ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL}, {ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL}, {ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL}, - {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetasRes, NULL, NULL}, + {ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL}, {ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL}, }; diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index d21eacf756..de9ea7fb5a 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -1144,5 +1144,9 @@ int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fet return TSDB_CODE_SUCCESS; } +SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) { + STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx); + return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx); +}