fix: fix get tb meta for tb cfg case
This commit is contained in:
parent
c1c3513b08
commit
ac63ee40c9
|
@ -720,11 +720,7 @@ void ctgClearHandle(SCatalog* pCtg);
|
|||
void ctgFreeTbCacheImpl(SCtgTbCache *pCache);
|
||||
int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
|
||||
int32_t ctgGetTbHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const SName *pTableName, SVgroupInfo *pVgroup);
|
||||
|
||||
FORCE_INLINE SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
|
||||
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
|
||||
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
|
||||
}
|
||||
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
||||
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
|
|
|
@ -930,18 +930,10 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
|
||||
#if CTG_BATCH_FETCH
|
||||
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
|
||||
SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
|
||||
int32_t flag = pFetch->flag;
|
||||
int32_t* vgId = &pFetch->vgId;
|
||||
#else
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
SName* pName = ctx->pName;
|
||||
int32_t flag = ctx->flag;
|
||||
int32_t* vgId = &ctx->vgId;
|
||||
#endif
|
||||
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||
|
||||
|
@ -1080,17 +1072,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
}
|
||||
*/
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
pRes->code = 0;
|
||||
pRes->pRes = pOut->tbMeta;
|
||||
pOut->tbMeta = NULL;
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
}
|
||||
#else
|
||||
TSWAP(pTask->res, pOut->tbMeta);
|
||||
#endif
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -1098,7 +1080,177 @@ _return:
|
|||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
#if CTG_BATCH_FETCH
|
||||
if (pTask->res) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask);
|
||||
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
|
||||
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx);
|
||||
SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
|
||||
int32_t flag = pFetch->flag;
|
||||
int32_t* vgId = &pFetch->vgId;
|
||||
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||
|
||||
switch (reqType) {
|
||||
case TDMT_MND_USE_DB: {
|
||||
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
|
||||
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, pOut->dbVgroup, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
case TDMT_MND_TABLE_META: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
if (CTG_FLAG_IS_STB(flag)) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pName, dbFName);
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
|
||||
|
||||
*vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask));
|
||||
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
} else {
|
||||
SBuildUseDBInput input = {0};
|
||||
|
||||
tstrncpy(input.db, dbFName, tListLen(input.db));
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask));
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (pMsgCtx->lastOut) {
|
||||
TSWAP(pMsgCtx->out, pMsgCtx->lastOut);
|
||||
STableMetaOutput* pLastOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
TSWAP(pLastOut->tbMeta, pOut->tbMeta);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
case TDMT_VND_TABLE_META: {
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
if (CTG_IS_META_NULL(pOut->metaType)) {
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pName));
|
||||
ctgRemoveTbMetaFromCache(pCtg, pName, false);
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (CTG_FLAG_IS_STB(flag)) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(pOut->metaType) && TSDB_SUPER_TABLE == pOut->tbMeta->tableType) {
|
||||
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s", tNameGetTableName(pName));
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
|
||||
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask));
|
||||
} else if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
int32_t exist = 0;
|
||||
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
|
||||
SName stbName = *pName;
|
||||
strcpy(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
taosMemoryFreeClear(pOut->tbMeta);
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
|
||||
if (pOut->tbMeta) {
|
||||
exist = 1;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 == exist) {
|
||||
TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
|
||||
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask));
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
||||
ctgUpdateTbMetaToCache(pCtg, pOut, false);
|
||||
|
||||
if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
|
||||
/*
|
||||
else if (CTG_IS_META_CTABLE(pOut->metaType)) {
|
||||
SName stbName = *pName;
|
||||
strcpy(stbName.tname, pOut->tbName);
|
||||
SCtgTbMetaCtx stbCtx = {0};
|
||||
stbCtx.flag = flag;
|
||||
stbCtx.pName = &stbName;
|
||||
|
||||
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
|
||||
if (NULL == pOut->tbMeta) {
|
||||
ctgDebug("stb no longer exist, stbName:%s", stbName.tname);
|
||||
CTG_ERR_JRET(ctgRelaunchGetTbMetaTask(pTask));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||
}
|
||||
*/
|
||||
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
pRes->code = 0;
|
||||
pRes->pRes = pOut->tbMeta;
|
||||
pOut->tbMeta = NULL;
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
TSWAP(pTask->res, ctx->pResList);
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
if (code) {
|
||||
SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx);
|
||||
pRes->code = code;
|
||||
|
@ -1107,7 +1259,6 @@ _return:
|
|||
TSWAP(pTask->res, ctx->pResList);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
if (pTask->res) {
|
||||
ctgHandleTaskEnd(pTask, code);
|
||||
|
@ -1116,6 +1267,7 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
|
||||
|
@ -1896,7 +2048,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
|||
{ctgInitGetUdfTask, ctgLaunchGetUdfTask, ctgHandleGetUdfRsp, ctgDumpUdfRes, NULL, NULL},
|
||||
{ctgInitGetUserTask, ctgLaunchGetUserTask, ctgHandleGetUserRsp, ctgDumpUserRes, NULL, NULL},
|
||||
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
|
||||
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetaRsp, ctgDumpTbMetasRes, NULL, NULL},
|
||||
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
|
||||
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
|
||||
};
|
||||
|
||||
|
|
|
@ -1144,5 +1144,9 @@ int32_t ctgAddFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t *fet
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch) {
|
||||
STablesReq* pReq = (STablesReq*)taosArrayGet(pNames, pFetch->dbIdx);
|
||||
return (SName*)taosArrayGet(pReq->pTables, pFetch->tbIdx);
|
||||
}
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue