fix vnode get table meta issue
This commit is contained in:
parent
475a399c31
commit
6c719eef5e
|
@ -518,6 +518,7 @@ int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2);
|
|||
int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
|
||||
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
||||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
|
||||
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
|
|
|
@ -44,7 +44,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ int32_t ctgInitGetDbVgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -90,7 +90,7 @@ int32_t ctgInitGetDbCfgTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -113,7 +113,7 @@ int32_t ctgInitGetDbInfoTask(SCtgJob *pJob, int32_t taskIdx, char *dbFName) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, dbFName:%s", pJob->queryId, taskIdx, task.type, dbFName);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, dbFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -143,7 +143,7 @@ int32_t ctgInitGetTbHashTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " task %d type %d initialized, tableName:%s", pJob->queryId, taskIdx, task.type, name->tname);
|
||||
qDebug("QID:0x%" PRIx64 " the %d task type %s initialized, tableName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name->tname);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -158,7 +158,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:%" PRIx64 " task %d type %d initialized", pJob->queryId, taskIdx, task.type);
|
||||
qDebug("QID:%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -181,7 +181,7 @@ int32_t ctgInitGetIndexTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:%" PRIx64 " task %d type %d initialized, indexFName:%s", pJob->queryId, taskIdx, task.type, name);
|
||||
qDebug("QID:%" PRIx64 " the %d task type %s initialized, indexFName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -204,7 +204,7 @@ int32_t ctgInitGetUdfTask(SCtgJob *pJob, int32_t taskIdx, char *name) {
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:%" PRIx64 " task %d type %d initialized, udfName:%s", pJob->queryId, taskIdx, task.type, name);
|
||||
qDebug("QID:%" PRIx64 " the %d task type %s initialized, udfName:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -227,11 +227,96 @@ int32_t ctgInitGetUserTask(SCtgJob *pJob, int32_t taskIdx, SUserAuthInfo *user)
|
|||
|
||||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:%" PRIx64 " task %d type %d initialized, user:%s", pJob->queryId, taskIdx, task.type, user->user);
|
||||
qDebug("QID:%" PRIx64 " the %d task type %s initialized, user:%s", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type), user->user);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
|
||||
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
|
||||
if (dbNum > 0) {
|
||||
if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) {
|
||||
SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
if (NULL == pDb) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||
}
|
||||
|
||||
char* dbFName = taosHashIterate(pDb, NULL);
|
||||
while (dbFName) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
dbFName = taosHashIterate(pDb, dbFName);
|
||||
}
|
||||
|
||||
taosHashCleanup(pDb);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
||||
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
|
||||
if (tbNum > 0) {
|
||||
if (tbNum > pJob->tbMetaNum && tbNum > pJob->tbHashNum) {
|
||||
SHashObj* pTb = taosHashInit(tbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||
}
|
||||
|
||||
SName* name = taosHashIterate(pTb, NULL);
|
||||
while (name) {
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
name = taosHashIterate(pTb, name);
|
||||
}
|
||||
|
||||
taosHashCleanup(pTb);
|
||||
} else {
|
||||
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableHash, i);
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq* pReq, catalogCallback fp, void* param, int32_t* taskNum) {
|
||||
int32_t code = 0;
|
||||
|
@ -283,12 +368,13 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (pReq->forceUpdate) {
|
||||
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq));
|
||||
}
|
||||
|
||||
int32_t taskIdx = 0;
|
||||
for (int32_t i = 0; i < dbVgNum; ++i) {
|
||||
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
||||
if (pReq->forceUpdate) {
|
||||
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
||||
}
|
||||
CTG_ERR_JRET(ctgInitGetDbVgTask(pJob, taskIdx++, dbFName));
|
||||
}
|
||||
|
||||
|
@ -304,9 +390,6 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
|
||||
for (int32_t i = 0; i < tbMetaNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
||||
if (pReq->forceUpdate) {
|
||||
catalogRemoveTableMeta(pCtg, name);
|
||||
}
|
||||
CTG_ERR_JRET(ctgInitGetTbMetaTask(pJob, taskIdx++, name));
|
||||
}
|
||||
|
||||
|
@ -342,7 +425,7 @@ int32_t ctgInitJob(CTG_PARAMS, SCtgJob** job, uint64_t reqId, const SCatalogReq*
|
|||
|
||||
taosAcquireRef(gCtgMgmt.jobPool, pJob->refId);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d", pJob->queryId, pJob->refId, *taskNum);
|
||||
qDebug("QID:0x%" PRIx64 ", jobId: 0x%" PRIx64 " initialized, task num %d, forceUpdate %d", pJob->queryId, pJob->refId, *taskNum, pReq->forceUpdate);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
||||
|
|
|
@ -565,7 +565,7 @@ int32_t ctgGetTbMetaFromVnode(CTG_PARAMS, const SName* pTableName, SVgroupInfo *
|
|||
}
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, tbFName));
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(CTG_PARAMS_LIST(), pTask, reqType, msg, msgLen));
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pTrans, &vgroupInfo->epSet, pTask, reqType, msg, msgLen));
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
|
|
|
@ -19,6 +19,31 @@
|
|||
#include "catalogInt.h"
|
||||
#include "systable.h"
|
||||
|
||||
char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
||||
switch (type) {
|
||||
case CTG_TASK_GET_QNODE:
|
||||
return "[get qnode list]";
|
||||
case CTG_TASK_GET_DB_VGROUP:
|
||||
return "[get db vgroup]";
|
||||
case CTG_TASK_GET_DB_CFG:
|
||||
return "[get db cfg]";
|
||||
case CTG_TASK_GET_DB_INFO:
|
||||
return "[get db info]";
|
||||
case CTG_TASK_GET_TB_META:
|
||||
return "[get table meta]";
|
||||
case CTG_TASK_GET_TB_HASH:
|
||||
return "[get table hash]";
|
||||
case CTG_TASK_GET_INDEX:
|
||||
return "[get index]";
|
||||
case CTG_TASK_GET_UDF:
|
||||
return "[get udf]";
|
||||
case CTG_TASK_GET_USER:
|
||||
return "[get user]";
|
||||
default:
|
||||
return "unknown";
|
||||
}
|
||||
}
|
||||
|
||||
void ctgFreeSMetaData(SMetaData* pData) {
|
||||
taosArrayDestroy(pData->pTableMeta);
|
||||
pData->pTableMeta = NULL;
|
||||
|
@ -477,6 +502,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
|
|||
|
||||
*pVgroup = *vgInfo;
|
||||
|
||||
ctgDebug("Got tb %s hash vgroup, vgId %d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId, vgInfo->epSet.numOfEps,
|
||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
|
|||
pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
|
||||
pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
|
||||
|
||||
qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db);
|
||||
|
||||
if (usedbRsp->vgNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -50,6 +52,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
|
|||
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
|
||||
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
|
||||
pOut->dbVgroup->numOfTable += pVgInfo->numOfTable;
|
||||
qDebug("the %dth vgroup, id %d, epNum %d, current %s port %d", i, pVgInfo->vgId, pVgInfo->epSet.numOfEps,
|
||||
pVgInfo->epSet.eps[pVgInfo->epSet.inUse].fqdn, pVgInfo->epSet.eps[pVgInfo->epSet.inUse].port);
|
||||
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue