|
|
|
@ -737,7 +737,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
|
|
|
|
int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
|
|
|
|
SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
|
|
|
|
if (oldInfo) {
|
|
|
|
|
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
|
|
|
|
@ -763,6 +763,48 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) {
|
|
|
|
|
SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName));
|
|
|
|
|
if (info) {
|
|
|
|
|
CTG_LOCK(CTG_WRITE, &info->lock);
|
|
|
|
|
if (info->dbId != target->dbId) {
|
|
|
|
|
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
|
|
|
|
|
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
|
|
|
|
taosHashRelease(pCatalog->dbCache.cache, info);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (info->vgVersion > target->vgVersion) {
|
|
|
|
|
ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion);
|
|
|
|
|
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
|
|
|
|
taosHashRelease(pCatalog->dbCache.cache, info);
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (info->vgInfo) {
|
|
|
|
|
ctgInfo("cleanup db vgInfo, db:%s", target->dbName);
|
|
|
|
|
taosHashCleanup(info->vgInfo);
|
|
|
|
|
info->vgInfo = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) {
|
|
|
|
|
ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName);
|
|
|
|
|
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
|
|
|
|
taosHashRelease(pCatalog->dbCache.cache, info);
|
|
|
|
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
|
|
|
|
|
|
|
|
|
taosHashRelease(pCatalog->dbCache.cache, info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
|
|
|
|
|
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
|
|
|
|
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
|
|
|
@ -1134,19 +1176,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|
|
|
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (dbInfo->vgVersion < 0) {
|
|
|
|
|
ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
|
|
|
|
|
|
|
|
|
|
if (pCatalog->dbCache.cache) {
|
|
|
|
|
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
|
|
|
|
|
|
|
|
|
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctgWarn("db removed from cache, db:%s", dbName);
|
|
|
|
|
goto _return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (NULL == pCatalog->dbCache.cache) {
|
|
|
|
|
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
|
|
|
|
if (NULL == cache) {
|
|
|
|
@ -1158,7 +1187,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|
|
|
|
taosHashCleanup(cache);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
|
|
|
|
CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
bool newAdded = false;
|
|
|
|
@ -1189,6 +1218,51 @@ _return:
|
|
|
|
|
CTG_RET(code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
|
|
|
|
|
if (NULL == pCatalog || NULL == dbInfo) {
|
|
|
|
|
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pCatalog->dbCache.cache) {
|
|
|
|
|
CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo));
|
|
|
|
|
|
|
|
|
|
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctgWarn("db removed from cache, db:%s", dbName);
|
|
|
|
|
|
|
|
|
|
bool newAdded = false;
|
|
|
|
|
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
|
|
|
|
|
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
|
|
|
|
|
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
dbInfo->vgInfo = NULL;
|
|
|
|
|
|
|
|
|
|
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
|
|
|
|
|
if (newAdded) {
|
|
|
|
|
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
|
|
|
|
|
} else {
|
|
|
|
|
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
_return:
|
|
|
|
|
|
|
|
|
|
if (dbInfo && dbInfo->vgInfo) {
|
|
|
|
|
taosHashCleanup(dbInfo->vgInfo);
|
|
|
|
|
dbInfo->vgInfo = NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
CTG_RET(code);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
|
|
|
|
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
|
|
|
|
|
}
|
|
|
|
|