|
|
|
@ -261,54 +261,48 @@ int32_t ctgInitGetTbIndexTask(SCtgJob *pJob, int32_t taskIdx, SName *name) {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* pReq) {
|
|
|
|
|
int32_t dbNum = pJob->dbCfgNum + pJob->dbVgNum + pJob->dbInfoNum;
|
|
|
|
|
if (dbNum > 0) {
|
|
|
|
|
if (dbNum > pJob->dbCfgNum && dbNum > pJob->dbVgNum && dbNum > pJob->dbInfoNum) {
|
|
|
|
|
SHashObj* pDb = taosHashInit(dbNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
|
|
|
if (NULL == pDb) {
|
|
|
|
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char* dbFName = taosHashIterate(pDb, NULL);
|
|
|
|
|
while (dbFName) {
|
|
|
|
|
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
|
|
|
|
dbFName = taosHashIterate(pDb, dbFName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(pDb);
|
|
|
|
|
} else {
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
|
|
|
|
CTG_ERR_RET(ctgDropDbVgroupEnqueue(pCtg, dbFName, true));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob *pJob, const SCatalogReq* pReq) {
|
|
|
|
|
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
|
|
|
|
if (NULL == pDb) {
|
|
|
|
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbVgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbVgroup, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbCfgNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbCfg, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->dbInfoNum; ++i) {
|
|
|
|
|
char* dbFName = taosArrayGet(pReq->pDbInfo, i);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbMetaNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableMeta, i);
|
|
|
|
|
char dbFName[TSDB_DB_FNAME_LEN];
|
|
|
|
|
tNameGetFullDbName(name, dbFName);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < pJob->tbHashNum; ++i) {
|
|
|
|
|
SName* name = taosArrayGet(pReq->pTableHash, i);
|
|
|
|
|
char dbFName[TSDB_DB_FNAME_LEN];
|
|
|
|
|
tNameGetFullDbName(name, dbFName);
|
|
|
|
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
char* dbFName = taosHashIterate(pDb, NULL);
|
|
|
|
|
while (dbFName) {
|
|
|
|
|
ctgDropDbVgroupEnqueue(pCtg, dbFName, true);
|
|
|
|
|
dbFName = taosHashIterate(pDb, dbFName);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
taosHashCleanup(pDb);
|
|
|
|
|
|
|
|
|
|
int32_t tbNum = pJob->tbMetaNum + pJob->tbHashNum;
|
|
|
|
|
if (tbNum > 0) {
|
|
|
|
@ -404,7 +398,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, uint6
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pReq->forceUpdate) {
|
|
|
|
|
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, pJob, pReq));
|
|
|
|
|
CTG_ERR_JRET(ctgHandleForceUpdate(pCtg, *taskNum, pJob, pReq));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t taskIdx = 0;
|
|
|
|
@ -790,8 +784,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|
|
|
|
_return:
|
|
|
|
|
|
|
|
|
|
if (dbCache) {
|
|
|
|
|
ctgRUnlockVgInfo(dbCache);
|
|
|
|
|
ctgReleaseDBCache(pCtg, dbCache);
|
|
|
|
|
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctgHandleTaskEnd(pTask, code);
|
|
|
|
@ -870,7 +863,7 @@ _return:
|
|
|
|
|
|
|
|
|
|
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
|
|
|
|
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
|
|
|
|
|
|
|
|
|
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
|
|
|
|
|
SArray* pInfo = NULL;
|
|
|
|
@ -949,7 +942,6 @@ _return:
|
|
|
|
|
|
|
|
|
|
int32_t ctgHandleGetUserRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
SCtgDBCache *dbCache = NULL;
|
|
|
|
|
SCtgUserCtx* ctx = (SCtgUserCtx*)pTask->taskCtx;
|
|
|
|
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
|
|
|
|
bool pass = false;
|
|
|
|
@ -1018,7 +1010,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
|
|
|
|
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
|
|
|
|
if (dbCache) {
|
|
|
|
|
SVgroupInfo vgInfo = {0};
|
|
|
|
|
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
|
|
|
|
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
|
|
|
|
|
|
|
|
|
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
|
|
|
|
|
|
|
|
@ -1067,6 +1059,9 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
|
|
|
|
|
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
|
|
|
|
if (NULL != dbCache) {
|
|
|
|
|
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
|
|
|
|
|
|
|
|
|
|
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
|
|
|
|
dbCache = NULL;
|
|
|
|
|
|
|
|
|
|
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
|
|
|
|
} else {
|
|
|
|
@ -1101,6 +1096,9 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
|
|
|
|
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
|
|
|
|
|
|
|
|
|
|
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
|
|
|
|
dbCache = NULL;
|
|
|
|
|
|
|
|
|
|
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
|
|
|
|
} else {
|
|
|
|
@ -1176,6 +1174,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
|
|
|
|
|
pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
|
|
|
|
|
pInfo->dbId = dbCache->dbId;
|
|
|
|
|
pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
|
|
|
|
|
|
|
|
|
|
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
|
|
|
|
dbCache = NULL;
|
|
|
|
|
} else {
|
|
|
|
|
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
|
|
|
|
|
}
|
|
|
|
|