diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9219a382e4..1eec92f47e 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -518,6 +518,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2); int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2); void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); +char *ctgTaskTypeStr(CTG_TASK_TYPE type); extern SCatalogMgmt gCtgMgmt; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 5eaccab004..f43e559244 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } @@ -67,7 +67,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -90,7 +90,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -113,7 +113,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName); return TSDB_CODE_SUCCESS; } @@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname); + qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname); return TSDB_CODE_SUCCESS; } @@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type); + qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } @@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name); return TSDB_CODE_SUCCESS; } @@ -227,11 +227,96 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user) taosArrayPush(pJob->pTasks, &task); - qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user); + qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user); return TSDB_CODE_SUCCESS; } +int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) { + int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum; + if (dbNum > 0) { + if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) { + SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + if (NULL == pDb) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + for (int32_t i = 0; i < pJob->dbVgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + for (int32_t i = 0; i < pJob->dbCfgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbCfg, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + for (int32_t i = 0; i < pJob->dbInfoNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbInfo, i); + taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN); + } + + char* dbFName = taosHashIterate(pDb, NULL); + while (dbFName) { + ctgDropDbVgroupEnqueue(pCtg, dbFName, true); + dbFName = taosHashIterate(pDb, dbFName); + } + + taosHashCleanup(pDb); + } else { + for (int32_t i = 0; i < pJob->dbVgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbVgroup, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + + for (int32_t i = 0; i < pJob->dbCfgNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbCfg, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + + for (int32_t i = 0; i < pJob->dbInfoNum; ++i) { + char* dbFName = taosArrayGet(pReq->pDbInfo, i); + CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true)); + } + } + } + + int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum; + if (tbNum > 0) { + if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) { + SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); + for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { + SName* name = taosArrayGet(pReq->pTableMeta, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + for (int32_t i = 0; i < pJob->tbHashNum; ++i) { + SName* name = taosArrayGet(pReq->pTableHash, i); + taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName)); + } + + SName* name = taosHashIterate(pTb, NULL); + while (name) { + catalogRemoveTableMeta(pCtg, name); + name = taosHashIterate(pTb, name); + } + + taosHashCleanup(pTb); + } else { + for (int32_t i = 0; i < pJob->tbMetaNum; ++i) { + SName* name = taosArrayGet(pReq->pTableMeta, i); + catalogRemoveTableMeta(pCtg, name); + } + + for (int32_t i = 0; i < pJob->tbHashNum; ++i) { + SName* name = taosArrayGet(pReq->pTableHash, i); + catalogRemoveTableMeta(pCtg, name); + } + } + } + + return TSDB_CODE_SUCCESS; +} int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) { int32_t code = 0; @@ -283,12 +368,13 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + if (pReq->forceUpdate) { + CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq)); + } + int32_t taskIdx = 0; for (int32_t i = 0; i < dbVgNum; ++i) { char* dbFName = taosArrayGet(pReq->pDbVgroup, i); - if (pReq->forceUpdate) { - ctgDropDbVgroupEnqueue(pCtg, dbFName, true); - } CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName)); } @@ -304,9 +390,6 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* for (int32_t i = 0; i < tbMetaNum; ++i) { SName* name = taosArrayGet(pReq->pTableMeta, i); - if (pReq->forceUpdate) { - catalogRemoveTableMeta(pCtg, name); - } CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name)); } @@ -342,7 +425,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* taosAcquireRef(gCtgMgmt.jobPool, pJob->refId); - qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum); + qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate); return TSDB_CODE_SUCCESS; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 8f3d498440..188f5b44c0 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -565,7 +565,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo * } CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName)); - CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen)); + CTG_RET(ctgAsyncSendMsg(pCtg, pTrans, &vgroupInfo->epSet, pTask, reqType, msg, msgLen)); } SRpcMsg rpcMsg = { diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 4625203dd8..962edf9af1 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -19,6 +19,31 @@ #include "catalogInt.h" #include "systable.h" +char *ctgTaskTypeStr(CTG_TASK_TYPE type) { + switch (type) { + case CTG_TASK_GET_QNODE: + return "[get qnode list]"; + case CTG_TASK_GET_DB_VGROUP: + return "[get db vgroup]"; + case CTG_TASK_GET_DB_CFG: + return "[get db cfg]"; + case CTG_TASK_GET_DB_INFO: + return "[get db info]"; + case CTG_TASK_GET_TB_META: + return "[get table meta]"; + case CTG_TASK_GET_TB_HASH: + return "[get table hash]"; + case CTG_TASK_GET_INDEX: + return "[get index]"; + case CTG_TASK_GET_UDF: + return "[get udf]"; + case CTG_TASK_GET_USER: + return "[get user]"; + default: + return "unknown"; + } +} + void ctgFreeSMetaData(SMetaData* pData) { taosArrayDestroy(pData->pTableMeta); pData->pTableMeta = NULL; @@ -477,6 +502,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName *pVgroup = *vgInfo; + ctgDebug("Got tb %s hash vgroup, vgId %d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps, + vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port); + CTG_RET(code); } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e77f2b0ca4..949a58c2fa 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -37,6 +37,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; pOut->dbVgroup->hashMethod = usedbRsp->hashMethod; + qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db); + if (usedbRsp->vgNum <= 0) { return TSDB_CODE_SUCCESS; } @@ -50,6 +52,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; + qDebug("the %dth vgroup, id %d, epNum %d, current %s port %d", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps, + pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port); if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; }