fix: fix msgIdx mismatch issue

This commit is contained in:
dapan1121 2022-08-06 15:43:19 +08:00
parent ac63ee40c9
commit a17f46a59c
4 changed files with 235 additions and 148 deletions

View File

@ -239,6 +239,7 @@ typedef struct SCtgBatch {
SRequestConnInfo conn; SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTaskIds; SArray* pTaskIds;
SArray* pMsgIdxs;
} SCtgBatch; } SCtgBatch;
typedef struct SCtgJob { typedef struct SCtgJob {
@ -287,6 +288,7 @@ typedef struct SCtgTaskCallbackParam {
SArray* taskId; SArray* taskId;
int32_t reqType; int32_t reqType;
int32_t batchId; int32_t batchId;
SArray* msgIdx;
} SCtgTaskCallbackParam; } SCtgTaskCallbackParam;
@ -314,12 +316,16 @@ typedef struct SCtgTask {
SArray* pParents; SArray* pParents;
SCtgSubRes subRes; SCtgSubRes subRes;
SHashObj* pBatchs; SHashObj* pBatchs;
int32_t msgIdx;
} SCtgTask; } SCtgTask;
typedef struct SCtgTaskReq {
SCtgTask* pTask;
int32_t msgIdx;
} SCtgTaskReq;
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*); typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
typedef int32_t (*ctgLanchTaskFp)(SCtgTask*); typedef int32_t (*ctgLanchTaskFp)(SCtgTask*);
typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTask*, int32_t, const SDataBuf *, int32_t); typedef int32_t (*ctgHandleTaskMsgRspFp)(SCtgTaskReq*, int32_t, const SDataBuf *, int32_t);
typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*); typedef int32_t (*ctgDumpTaskResFp)(SCtgTask*);
typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**); typedef int32_t (*ctgCloneTaskResFp)(SCtgTask*, void**);
typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*); typedef int32_t (*ctgCompTaskFp)(SCtgTask*, void*, bool*);
@ -518,7 +524,7 @@ typedef struct SCtgOperation {
#define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB)) #define CTG_FLAG_MAKE_STB(_isStb) (((_isStb) == 1) ? CTG_FLAG_STB : ((_isStb) == 0 ? CTG_FLAG_NOT_STB : CTG_FLAG_UNKNOWN_STB))
#define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE)) #define CTG_FLAG_MATCH_STB(_flag, tbType) (CTG_FLAG_IS_UNKNOWN_STB(_flag) || (CTG_FLAG_IS_STB(_flag) && (tbType) == TSDB_SUPER_TABLE) || (CTG_FLAG_IS_NOT_STB(_flag) && (tbType) != TSDB_SUPER_TABLE))
#define CTG_GET_TASK_MSGCTX(_task) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_task)->msgIdx) : &(_task)->msgCtx) #define CTG_GET_TASK_MSGCTX(_task, _id) (((CTG_TASK_GET_TB_META_BATCH == (_task)->type) || (CTG_TASK_GET_TB_HASH_BATCH == (_task)->type)) ? taosArrayGet((_task)->msgCtxs, (_id)) : &(_task)->msgCtx)
#define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema)) #define CTG_META_SIZE(pMeta) (sizeof(STableMeta) + ((pMeta)->tableInfo.numOfTags + (pMeta)->tableInfo.numOfColumns) * sizeof(SSchema))
@ -665,7 +671,7 @@ int32_t ctgGetTbHashVgroupFromCache(SCatalog *pCtg, const SName *pTableName, SVg
int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target); int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, int32_t rspCode, char* target);
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTask* pTask); int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildUseDBInput *input, SUseDbOutput *out, SCtgTaskReq* tReq);
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask); int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray **out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
@ -673,9 +679,9 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask); int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableMetaOutput* out, SCtgTaskReq* tReq);
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask); int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask); int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
@ -698,7 +704,7 @@ void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg); void ctgFreeHandleImpl(SCatalog* pCtg);
void ctgFreeVgInfo(SDBVgInfo *vgInfo); void ctgFreeVgInfo(SDBVgInfo *vgInfo);
int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup); int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pTableName, SVgroupInfo *pVgroup);
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update); int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update);
void ctgResetTbMetaTask(SCtgTask* pTask); void ctgResetTbMetaTask(SCtgTask* pTask);
void ctgFreeDbCache(SCtgDBCache *dbCache); void ctgFreeDbCache(SCtgDBCache *dbCache);
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2); int32_t ctgStbVersionSortCompare(const void* key1, const void* key2);

View File

@ -381,7 +381,10 @@ int32_t ctgInitGetTbCfgTask(SCtgJob *pJob, int32_t taskIdx, void* param) {
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) { int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb) { SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (NULL == pDb || NULL == pTb) {
taosHashCleanup(pDb);
taosHashCleanup(pTb);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
@ -400,18 +403,26 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
} }
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { int32_t dbNum = taosArrayGetSize(pReq->pTableMeta);
SName* name = taosArrayGet(pReq->pTableMeta, i); for (int32_t i = 0; i < dbNum; ++i) {
char dbFName[TSDB_DB_FNAME_LEN]; STablesReq* p = taosArrayGet(pReq->pTableMeta, i);
tNameGetFullDbName(name, dbFName); taosHashPut(pDb, p->dbFName, strlen(p->dbFName), p->dbFName, TSDB_DB_FNAME_LEN);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); int32_t tbNum = taosArrayGetSize(p->pTables);
for (int32_t m = 0; m < tbNum; ++m) {
SName* name = taosArrayGet(p->pTables, m);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
} }
for (int32_t i = 0; i < pJob->tbHashNum; ++i) { dbNum = taosArrayGetSize(pReq->pTableHash);
SName* name = taosArrayGet(pReq->pTableHash, i); for (int32_t i = 0; i < dbNum; ++i) {
char dbFName[TSDB_DB_FNAME_LEN]; STablesReq* p = taosArrayGet(pReq->pTableHash, i);
tNameGetFullDbName(name, dbFName); taosHashPut(pDb, p->dbFName, strlen(p->dbFName), p->dbFName, TSDB_DB_FNAME_LEN);
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); int32_t tbNum = taosArrayGetSize(p->pTables);
for (int32_t m = 0; m < tbNum; ++m) {
SName* name = taosArrayGet(p->pTables, m);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
} }
for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
@ -430,16 +441,6 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, con
taosHashCleanup(pDb); taosHashCleanup(pDb);
// REFRESH TABLE META // REFRESH TABLE META
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
SName* name = taosArrayGet(pReq->pTableMeta, i);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
SName* name = taosArrayGet(pReq->pTableHash, i);
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
}
for (int32_t i = 0; i < pJob->tbCfgNum; ++i) { for (int32_t i = 0; i < pJob->tbCfgNum; ++i) {
SName* name = taosArrayGet(pReq->pTableCfg, i); SName* name = taosArrayGet(pReq->pTableCfg, i);
@ -924,12 +925,13 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
SName* pName = ctx->pName; SName* pName = ctx->pName;
int32_t flag = ctx->flag; int32_t flag = ctx->flag;
@ -947,7 +949,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -967,7 +969,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
} else { } else {
@ -976,7 +978,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1014,7 +1016,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} else if (CTG_IS_META_BOTH(pOut->metaType)) { } else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0; int32_t exist = 0;
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) { if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
@ -1033,7 +1035,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
if (0 == exist) { if (0 == exist) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out); TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
} }
} }
break; break;
@ -1088,14 +1090,15 @@ _return:
} }
int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
SName* pName = ctgGetFetchName(ctx->pNames, pFetch); SName* pName = ctgGetFetchName(ctx->pNames, pFetch);
int32_t flag = pFetch->flag; int32_t flag = pFetch->flag;
int32_t* vgId = &pFetch->vgId; int32_t* vgId = &pFetch->vgId;
@ -1112,7 +1115,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1132,7 +1135,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
} else { } else {
@ -1141,7 +1144,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1179,7 +1182,7 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} else if (CTG_IS_META_BOTH(pOut->metaType)) { } else if (CTG_IS_META_BOTH(pOut->metaType)) {
int32_t exist = 0; int32_t exist = 0;
if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) { if (!CTG_FLAG_IS_FORCE_UPDATE(flag)) {
@ -1192,13 +1195,14 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
taosMemoryFreeClear(pOut->tbMeta); taosMemoryFreeClear(pOut->tbMeta);
CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta)); CTG_ERR_JRET(ctgReadTbMetaFromCache(pCtg, &stbCtx, &pOut->tbMeta));
if (pOut->tbMeta) { if (pOut->tbMeta) {
ctgDebug("use cached stb meta, tbName:%s", tNameGetTableName(pName));
exist = 1; exist = 1;
} }
} }
if (0 == exist) { if (0 == exist) {
TSWAP(pMsgCtx->lastOut, pMsgCtx->out); TSWAP(pMsgCtx->lastOut, pMsgCtx->out);
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, pOut->dbFName, pOut->tbName, NULL, tReq));
} }
} }
break; break;
@ -1268,8 +1272,9 @@ _return:
} }
int32_t ctgHandleGetDbVgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx; SCtgDbVgCtx* ctx = (SCtgDbVgCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
@ -1301,8 +1306,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx; SCtgTbHashCtx* ctx = (SCtgTbHashCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
@ -1338,12 +1344,13 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbHashsRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx; SCtgTbHashsCtx* ctx = (SCtgTbHashsCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask); SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); SCtgFetch* pFetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
@ -1352,7 +1359,7 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(ctx->pNames, pFetch->dbIdx);
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true)); CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, tReq, pOut->dbVgroup, ctx, pMsgCtx->target, pReq->pTables, true));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false)); CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, pMsgCtx->target, pOut->dbId, pOut->dbVgroup, false));
pOut->dbVgroup = NULL; pOut->dbVgroup = NULL;
@ -1393,8 +1400,9 @@ _return:
} }
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out; STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
@ -1415,8 +1423,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetTbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1428,8 +1437,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbCfgRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1441,13 +1451,14 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDbInfoRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDbInfoRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
CTG_RET(TSDB_CODE_APP_ERROR); CTG_RET(TSDB_CODE_APP_ERROR);
} }
int32_t ctgHandleGetQnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetQnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1459,8 +1470,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetDnodeRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetDnodeRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1472,8 +1484,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetIndexRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1485,8 +1498,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUdfRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUdfRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); TSWAP(pTask->res, pTask->msgCtx.out);
@ -1498,8 +1512,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetUserRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx; SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
bool pass = false; bool pass = false;
@ -1542,8 +1557,9 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgHandleGetSvrVerRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetSvrVerRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
@ -1556,7 +1572,8 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask, int32_t flag, SName* pName, int32_t* vgId) { int32_t ctgAsyncRefreshTbMeta(SCtgTaskReq *tReq, int32_t flag, SName* pName, int32_t* vgId) {
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
int32_t code = 0; int32_t code = 0;
@ -1564,14 +1581,14 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask, int32_t flag, SName* pName, int32
if (CTG_FLAG_IS_SYS_DB(flag)) { if (CTG_FLAG_IS_SYS_DB(flag)) {
ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName)); ctgDebug("will refresh sys db tbmeta, tbName:%s", tNameGetTableName(pName));
CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnodeImpl(pCtg, pConn, (char *)pName->dbname, (char *)pName->tname, NULL, tReq));
} }
if (CTG_FLAG_IS_STB(flag)) { if (CTG_FLAG_IS_STB(flag)) {
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pName)); ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pName));
// if get from mnode failed, will not try vnode // if get from mnode failed, will not try vnode
CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, pTask)); CTG_RET(ctgGetTbMetaFromMnode(pCtg, pConn, pName, NULL, tReq));
} }
SCtgDBCache *dbCache = NULL; SCtgDBCache *dbCache = NULL;
@ -1586,14 +1603,14 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask, int32_t flag, SName* pName, int32
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(pName), flag);
*vgId = vgInfo.vgId; *vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, pName, &vgInfo, NULL, tReq));
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
tstrncpy(input.db, dbFName, tListLen(input.db)); tstrncpy(input.db, dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} }
_return: _return:
@ -1616,7 +1633,10 @@ int32_t ctgLaunchGetTbMetaTask(SCtgTask *pTask) {
} }
SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* pCtx = (SCtgTbMetaCtx*)pTask->taskCtx;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pCtx->flag, pCtx->pName, &pCtx->vgId)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pCtx->flag, pCtx->pName, &pCtx->vgId));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1651,12 +1671,12 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask *pTask) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
pTask->msgIdx = pFetch->fetchIdx; SCtgTaskReq tReq;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(pTask, pFetch->flag, pName, &pFetch->vgId)); tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgAsyncRefreshTbMeta(&tReq, pFetch->flag, pName, &pFetch->vgId));
} }
pTask->msgIdx = 0;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1681,7 +1701,10 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db)); tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
} }
_return: _return:
@ -1718,7 +1741,10 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
tstrncpy(input.db, pCtx->dbFName, tListLen(input.db)); tstrncpy(input.db, pCtx->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
} }
_return: _return:
@ -1746,7 +1772,10 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pReq->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pReq->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, pTask, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_ERR_JRET(ctgGetVgInfosFromHashValue(pCtg, &tReq, dbCache->vgCache.vgInfo, pCtx, pReq->dbFName, pReq->pTables, false));
ctgReleaseVgInfoToCache(pCtg, dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL; dbCache = NULL;
@ -1775,18 +1804,17 @@ int32_t ctgLaunchGetTbHashsTask(SCtgTask *pTask) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
pTask->msgIdx = pFetch->fetchIdx;
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
strcpy(input.db, pReq->dbFName); strcpy(input.db, pReq->dbFName);
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pTask)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = pFetch->fetchIdx;
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, &tReq));
} }
pTask->msgIdx = 0;
_return: _return:
if (dbCache) { if (dbCache) {

View File

@ -22,9 +22,8 @@
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) { int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
SArray* pTaskId = cbParam->taskId;
SCatalog* pCtg = pJob->pCtg; SCatalog* pCtg = pJob->pCtg;
int32_t taskNum = taosArrayGetSize(pTaskId); int32_t taskNum = taosArrayGetSize(cbParam->taskId);
SDataBuf taskMsg = *pMsg; SDataBuf taskMsg = *pMsg;
int32_t offset = 0; int32_t offset = 0;
int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0; int32_t msgNum = (TSDB_CODE_SUCCESS == rspCode && pMsg->pData && (pMsg->len > 0)) ? ntohl(*(int32_t*)pMsg->pData) : 0;
@ -42,7 +41,8 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
} }
for (int32_t i = 0; i < taskNum; ++i) { for (int32_t i = 0; i < taskNum; ++i) {
int32_t* taskId = taosArrayGet(pTaskId, i); int32_t* taskId = taosArrayGet(cbParam->taskId, i);
int32_t* msgIdx = taosArrayGet(cbParam->msgIdx, i);
SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId); SCtgTask* pTask = taosArrayGet(pJob->pTasks, *taskId);
if (msgNum > 0) { if (msgNum > 0) {
rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset)); rsp.reqType = ntohl(*(int32_t*)((char*)pMsg->pData + offset));
@ -59,8 +59,10 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
taskMsg.msgType = rsp.reqType; taskMsg.msgType = rsp.reqType;
taskMsg.pData = rsp.msg; taskMsg.pData = rsp.msg;
taskMsg.len = rsp.msgLen; taskMsg.len = rsp.msgLen;
ASSERT(rsp.msgIdx == *msgIdx);
} else { } else {
rsp.msgIdx = pTask->msgIdx++; rsp.msgIdx = *msgIdx;
rsp.reqType = -1; rsp.reqType = -1;
taskMsg.msgType = -1; taskMsg.msgType = -1;
taskMsg.pData = NULL; taskMsg.pData = NULL;
@ -68,11 +70,14 @@ int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBu
} }
pTask->pBatchs = pBatchs; pTask->pBatchs = pBatchs;
pTask->msgIdx = rsp.msgIdx;
ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, pTask->msgIdx, TMSG_INFO(taskMsg.msgType + 1)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = rsp.msgIdx;
(*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode)); ctgDebug("QID:0x%" PRIx64 " ctg task %d idx %d start to handle rsp %s", pJob->queryId, pTask->taskId, rsp.msgIdx, TMSG_INFO(taskMsg.msgType + 1));
(*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, rsp.reqType, &taskMsg, (rsp.rspCode ? rsp.rspCode : rspCode));
} }
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
@ -341,7 +346,10 @@ int32_t ctgHandleMsgCallback(void* param, SDataBuf* pMsg, int32_t rspCode) {
pTask->pBatchs = pBatchs; pTask->pBatchs = pBatchs;
#endif #endif
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(pTask, cbParam->reqType, pMsg, rspCode)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].handleRspFp)(&tReq, cbParam->reqType, pMsg, rspCode));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs)); CTG_ERR_JRET(ctgLaunchBatchs(pJob->pCtg, pJob, pBatchs));
@ -359,7 +367,7 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t msgType, int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, SArray* pMsgIdx, int32_t msgType,
SMsgSendInfo** pMsgSendInfo) { SMsgSendInfo** pMsgSendInfo) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); SMsgSendInfo* msgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
@ -379,6 +387,7 @@ int32_t ctgMakeMsgSendInfo(SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int3
param->refId = pJob->refId; param->refId = pJob->refId;
param->taskId = pTaskId; param->taskId = pTaskId;
param->batchId = batchId; param->batchId = batchId;
param->msgIdx = pMsgIdx;
msgSendInfo->param = param; msgSendInfo->param = param;
msgSendInfo->paramFreeFp = ctgFreeMsgSendParam; msgSendInfo->paramFreeFp = ctgFreeMsgSendParam;
@ -397,10 +406,10 @@ _return:
} }
int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId, int32_t ctgAsyncSendMsg(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob* pJob, SArray* pTaskId, int32_t batchId,
char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) { SArray* pMsgIdx, char* dbFName, int32_t vgId, int32_t msgType, void* msg, uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SMsgSendInfo* pMsgSendInfo = NULL; SMsgSendInfo* pMsgSendInfo = NULL;
CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, msgType, &pMsgSendInfo)); CTG_ERR_JRET(ctgMakeMsgSendInfo(pJob, pTaskId, batchId, pMsgIdx, msgType, &pMsgSendInfo));
ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId); ctgUpdateSendTargetInfo(pMsgSendInfo, msgType, dbFName, vgId);
@ -431,9 +440,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTask* pTask, int32_t msgType, void* msg, int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgTaskReq* tReq, int32_t msgType, void* msg,
uint32_t msgSize) { uint32_t msgSize) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SHashObj* pBatchs = pTask->pBatchs; SHashObj* pBatchs = pTask->pBatchs;
SCtgJob* pJob = pTask->pJob; SCtgJob* pJob = pTask->pJob;
SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId)); SCtgBatch* pBatch = taosHashGet(pBatchs, &vgId, sizeof(vgId));
@ -443,13 +453,14 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == pBatch) { if (NULL == pBatch) {
newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg)); newBatch.pMsgs = taosArrayInit(pJob->subTaskNum, sizeof(SBatchMsg));
newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t)); newBatch.pTaskIds = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds) { newBatch.pMsgIdxs = taosArrayInit(pJob->subTaskNum, sizeof(int32_t));
if (NULL == newBatch.pMsgs || NULL == newBatch.pTaskIds || NULL == newBatch.pMsgIdxs) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
newBatch.conn = *pConn; newBatch.conn = *pConn;
req.msgIdx = pTask->msgIdx; req.msgIdx = tReq->msgIdx;
req.msgType = msgType; req.msgType = msgType;
req.msgLen = msgSize; req.msgLen = msgSize;
req.msg = msg; req.msg = msg;
@ -459,6 +470,9 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
if (NULL == taosArrayPush(newBatch.pMsgIdxs, &req.msgIdx)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES; newBatch.msgSize = sizeof(SBatchReq) + sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) { if (vgId > 0) {
@ -469,7 +483,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
} else if (TDMT_VND_TABLE_META == msgType) { } else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch); pName = ctgGetFetchName(ctx->pNames, fetch);
} else { } else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
@ -496,7 +510,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
req.msgIdx = pTask->msgIdx; req.msgIdx = tReq->msgIdx;
req.msgType = msgType; req.msgType = msgType;
req.msgLen = msgSize; req.msgLen = msgSize;
req.msg = msg; req.msg = msg;
@ -506,6 +520,10 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
if (NULL == taosArrayPush(pBatch->pMsgIdxs, &req.msgIdx)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES; pBatch->msgSize += sizeof(req) + msgSize - POINTER_BYTES;
if (vgId > 0) { if (vgId > 0) {
@ -516,7 +534,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
} else if (TDMT_VND_TABLE_META == msgType) { } else if (TDMT_VND_TABLE_META == msgType) {
if (CTG_TASK_GET_TB_META_BATCH == pTask->type) { if (CTG_TASK_GET_TB_META_BATCH == pTask->type) {
SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx; SCtgTbMetasCtx* ctx = (SCtgTbMetasCtx*)pTask->taskCtx;
SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, pTask->msgIdx); SCtgFetch* fetch = taosArrayGet(ctx->pFetchs, tReq->msgIdx);
pName = ctgGetFetchName(ctx->pNames, fetch); pName = ctgGetFetchName(ctx->pNames, fetch);
} else { } else {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
@ -588,8 +606,8 @@ int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob* pJob, SHashObj* pBatchs) {
ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId); ctgDebug("QID:0x%" PRIx64 " ctg start to launch batch %d", pJob->queryId, pBatch->batchId);
CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg)); CTG_ERR_JRET(ctgBuildBatchReqMsg(pBatch, *vgId, &msg));
code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->dbFName, *vgId, code = ctgAsyncSendMsg(pCtg, &pBatch->conn, pJob, pBatch->pTaskIds, pBatch->batchId, pBatch->pMsgIdxs,
pBatch->msgType, msg, pBatch->msgSize); pBatch->dbFName, *vgId, pBatch->msgType, msg, pBatch->msgSize);
pBatch->pTaskIds = NULL; pBatch->pTaskIds = NULL;
CTG_ERR_JRET(code); CTG_ERR_JRET(code);
@ -628,10 +646,13 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, NULL));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -639,7 +660,7 @@ int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -674,10 +695,13 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
} }
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -685,7 +709,7 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -706,10 +730,11 @@ int32_t ctgGetDnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SArray
} }
int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out, int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildUseDBInput* input, SUseDbOutput* out,
SCtgTask* pTask) { SCtgTaskReq* tReq) {
char* msg = NULL; char* msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_USE_DB; int32_t reqType = TDMT_MND_USE_DB;
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont; void* (*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db); ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
@ -726,10 +751,10 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, input->db)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, input->db));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -737,7 +762,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SBuildU
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -778,10 +803,13 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)dbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)dbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -789,7 +817,7 @@ int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const char
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -830,10 +858,13 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)indexName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)indexName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -841,7 +872,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -884,10 +915,13 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -895,7 +929,7 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, SName* n
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -936,10 +970,13 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)funcName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)funcName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -947,7 +984,7 @@ int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const ch
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -988,10 +1025,13 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, (char*)user)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)user));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -999,7 +1039,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -1020,7 +1060,8 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const
} }
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName, int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char* dbFName, char* tbName,
STableMetaOutput* out, SCtgTask* pTask) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL;
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName}; SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
char* msg = NULL; char* msg = NULL;
SEpSet* pVnodeEpSet = NULL; SEpSet* pVnodeEpSet = NULL;
@ -1044,10 +1085,10 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, 0, pConn, tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -1055,7 +1096,7 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -1076,15 +1117,16 @@ int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo* pConn, char*
} }
int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out, int32_t ctgGetTbMetaFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableMetaOutput* out,
SCtgTask* pTask) { SCtgTaskReq* tReq) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, pTask); return ctgGetTbMetaFromMnodeImpl(pCtg, pConn, dbFName, (char*)pTableName->tname, out, tReq);
} }
int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo, int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* vgroupInfo,
STableMetaOutput* out, SCtgTask* pTask) { STableMetaOutput* out, SCtgTaskReq* tReq) {
SCtgTask *pTask = tReq ? tReq->pTask : NULL;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTableName, dbFName); tNameGetFullDbName(pTableName, dbFName);
int32_t reqType = TDMT_VND_TABLE_META; int32_t reqType = TDMT_VND_TABLE_META;
@ -1118,10 +1160,10 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
.requestObjRefId = pConn->requestObjRefId, .requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet}; .mgmtEps = vgroupInfo->epSet};
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, pOut, tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx), reqType, pOut, tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
#else #else
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
@ -1132,7 +1174,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->vgId, reqType, msg, msgLen));
#endif #endif
} }
@ -1175,14 +1217,17 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans, SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId, .requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId, .requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet}; .mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, &tReq, reqType, msg, msgLen));
#else #else
SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx; SCtgTbCfgCtx* ctx = (SCtgTbCfgCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
@ -1193,7 +1238,7 @@ int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, ctx->pVgInfo->vgId, reqType, msg, msgLen));
#endif #endif
} }
@ -1234,10 +1279,13 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, (char*)tbFName));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -1245,7 +1293,7 @@ int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const S
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }
@ -1280,10 +1328,13 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
} }
if (pTask) { if (pTask) {
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask), reqType, NULL, NULL)); CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, NULL, NULL));
#if CTG_BATCH_FETCH #if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, 0, pConn, pTask, reqType, msg, msgLen)); SCtgTaskReq tReq;
tReq.pTask = pTask;
tReq.msgIdx = -1;
CTG_RET(ctgAddBatch(pCtg, 0, pConn, &tReq, reqType, msg, msgLen));
#else #else
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) { if (NULL == pTaskId) {
@ -1291,7 +1342,7 @@ int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, char** ou
} }
taosArrayPush(pTaskId, &pTask->taskId); taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, 0, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask->pJob, pTaskId, -1, NULL, NULL, 0, reqType, msg, msgLen));
#endif #endif
} }

View File

@ -26,6 +26,7 @@ void ctgFreeMsgSendParam(void* param) {
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param; SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
taosArrayDestroy(pParam->taskId); taosArrayDestroy(pParam->taskId);
taosArrayDestroy(pParam->msgIdx);
taosMemoryFree(param); taosMemoryFree(param);
} }
@ -874,8 +875,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) { int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
int32_t code = 0; int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SMetaRes res = {0}; SMetaRes res = {0};
int32_t vgNum = taosHashGetSize(dbInfo->vgHash); int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
if (vgNum <= 0) { if (vgNum <= 0) {
@ -904,7 +906,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) { if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i); SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = vgInfo; pRes->pRes = vgInfo;
} else { } else {
@ -958,7 +960,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTask* pTask, SDBVgInfo *d
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
if (update) { if (update) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, pTask->msgIdx); SCtgFetch* pFetch = taosArrayGet(pCtx->pFetchs, tReq->msgIdx);
SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i); SMetaRes *pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx + i);
pRes->pRes = pNewVg; pRes->pRes = pNewVg;
} else { } else {