catalog remove db vgroup info
This commit is contained in:
parent
c51385570b
commit
4d1ba65c46
|
@ -147,7 +147,7 @@ typedef struct {
|
|||
} SBuildTableMetaInput;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_TABLE_FNAME_LEN];
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t vgVersion;
|
||||
} SBuildUseDBInput;
|
||||
|
||||
|
@ -746,7 +746,7 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int64_t uid;
|
||||
uint64_t uid;
|
||||
int32_t vgVersion;
|
||||
int32_t vgNum;
|
||||
int8_t hashMethod;
|
||||
|
|
|
@ -59,6 +59,7 @@ typedef struct SSTableMetaVersion {
|
|||
} SSTableMetaVersion;
|
||||
|
||||
typedef struct SDbVgVersion {
|
||||
char dbName[TSDB_DB_FNAME_LEN];
|
||||
int64_t dbId;
|
||||
int32_t vgVersion;
|
||||
} SDbVgVersion;
|
||||
|
|
|
@ -82,7 +82,7 @@ typedef struct STableMeta {
|
|||
|
||||
typedef struct SDBVgroupInfo {
|
||||
SRWLatch lock;
|
||||
int64_t dbId;
|
||||
uint64_t dbId;
|
||||
int32_t vgVersion;
|
||||
int8_t hashMethod;
|
||||
SHashObj *vgInfo; //key:vgId, value:SVgroupInfo
|
||||
|
|
|
@ -737,7 +737,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||
int32_t ctgValidateAndFreeDbInfo(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) {
|
||||
SDBVgroupInfo *oldInfo = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, dbName, strlen(dbName));
|
||||
if (oldInfo) {
|
||||
CTG_LOCK(CTG_WRITE, &oldInfo->lock);
|
||||
|
@ -763,6 +763,48 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, SD
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgValidateAndRemoveDbInfo(struct SCatalog* pCatalog, SDbVgVersion* target) {
|
||||
SDBVgroupInfo *info = (SDBVgroupInfo *)taosHashAcquire(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName));
|
||||
if (info) {
|
||||
CTG_LOCK(CTG_WRITE, &info->lock);
|
||||
if (info->dbId != target->dbId) {
|
||||
ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, target->dbName, info->dbId, target->dbId);
|
||||
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
||||
taosHashRelease(pCatalog->dbCache.cache, info);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (info->vgVersion > target->vgVersion) {
|
||||
ctgInfo("db vgVersion already updated, db:%s, version:%d, targetVer:%d", target->dbName, info->vgVersion, target->vgVersion);
|
||||
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
||||
taosHashRelease(pCatalog->dbCache.cache, info);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (info->vgInfo) {
|
||||
ctgInfo("cleanup db vgInfo, db:%s", target->dbName);
|
||||
taosHashCleanup(info->vgInfo);
|
||||
info->vgInfo = NULL;
|
||||
}
|
||||
|
||||
if (taosHashRemove(pCatalog->dbCache.cache, target->dbName, strlen(target->dbName))) {
|
||||
ctgError("taosHashRemove from dbCache failed, db:%s", target->dbName);
|
||||
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
||||
taosHashRelease(pCatalog->dbCache.cache, info);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &info->lock);
|
||||
|
||||
taosHashRelease(pCatalog->dbCache.cache, info);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
|
||||
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
|
@ -1134,19 +1176,6 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (dbInfo->vgVersion < 0) {
|
||||
ctgWarn("db vgVersion less than 0, dbName:%s, vgVersion:%d", dbName, dbInfo->vgVersion);
|
||||
|
||||
if (pCatalog->dbCache.cache) {
|
||||
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
||||
|
||||
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
|
||||
}
|
||||
|
||||
ctgWarn("db removed from cache, db:%s", dbName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == cache) {
|
||||
|
@ -1158,7 +1187,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
taosHashCleanup(cache);
|
||||
}
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbName, dbInfo));
|
||||
CTG_ERR_JRET(ctgValidateAndFreeDbInfo(pCatalog, dbName, dbInfo));
|
||||
}
|
||||
|
||||
bool newAdded = false;
|
||||
|
@ -1189,6 +1218,51 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogRemoveDBVgroup(struct SCatalog* pCatalog, SDbVgVersion* dbInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (NULL == pCatalog || NULL == dbInfo) {
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (pCatalog->dbCache.cache) {
|
||||
CTG_ERR_JRET(ctgValidateAndRemoveDbInfo(pCatalog, dbInfo));
|
||||
|
||||
CTG_ERR_JRET(taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)));
|
||||
}
|
||||
|
||||
ctgWarn("db removed from cache, db:%s", dbName);
|
||||
|
||||
bool newAdded = false;
|
||||
if (taosHashPutExt(pCatalog->dbCache.cache, dbName, strlen(dbName), dbInfo, sizeof(*dbInfo), &newAdded) != 0) {
|
||||
ctgError("taosHashPutExt db vgroup to cache failed, db:%s", dbName);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
dbInfo->vgInfo = NULL;
|
||||
|
||||
SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion};
|
||||
if (newAdded) {
|
||||
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion)));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, dbInfo->dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare));
|
||||
}
|
||||
|
||||
ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, dbInfo->vgVersion);
|
||||
|
||||
|
||||
_return:
|
||||
|
||||
if (dbInfo && dbInfo->vgInfo) {
|
||||
taosHashCleanup(dbInfo->vgInfo);
|
||||
dbInfo->vgInfo = NULL;
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
|
||||
}
|
||||
|
|
|
@ -59,7 +59,7 @@ int32_t ctgTestTagNum = 1;
|
|||
int32_t ctgTestSVersion = 1;
|
||||
int32_t ctgTestTVersion = 1;
|
||||
int32_t ctgTestSuid = 2;
|
||||
int64_t ctgTestDbId = 33;
|
||||
uint64_t ctgTestDbId = 33;
|
||||
|
||||
uint64_t ctgTestClusterId = 0x1;
|
||||
char *ctgTestDbname = "1.db1";
|
||||
|
|
Loading…
Reference in New Issue