From 1b98943dd20dc0d8a98018a27eb56f8c6584ec81 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 7 Feb 2022 19:01:26 +0800 Subject: [PATCH] feature/qnode --- include/libs/catalog/catalog.h | 2 +- include/libs/qcom/query.h | 1 + include/util/taoserror.h | 1 + source/client/src/clientHb.c | 3 +- source/libs/catalog/inc/catalogInt.h | 13 +- source/libs/catalog/src/catalog.c | 610 ++++++++++++++-------- source/libs/catalog/test/catalogTests.cpp | 5 +- source/libs/qcom/src/querymsg.c | 2 +- source/util/src/terror.c | 1 + 9 files changed, 399 insertions(+), 239 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index c291ebd8fd..fef951b010 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -99,7 +99,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, */ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo); int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 2e4093590d..808437cb7e 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -89,6 +89,7 @@ typedef struct SDBVgroupInfo { typedef struct SUseDbOutput { char db[TSDB_DB_FNAME_LEN]; + uint64_t dbId; SDBVgroupInfo *dbVgroup; } SUseDbOutput; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b5740b0118..6237de36ff 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -450,3 +450,4 @@ int32_t* taosGetErrno(); #endif #endif /*_TD_COMMON_TAOS_ERROR_H_*/ + \ No newline at end of file diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d265ffaa94..e80e87f03b 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -44,7 +44,6 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { SDBVgroupInfo vgInfo = {0}; - vgInfo.dbId = rsp->uid; vgInfo.vgVersion = rsp->vgVersion; vgInfo.hashMethod = rsp->hashMethod; vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); @@ -69,7 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog } } - code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); if (code) { taosHashCleanup(vgInfo.vgHash); } diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9c041d76c7..1c4a19530d 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -48,18 +48,22 @@ enum { }; typedef struct SCtgDebug { - int32_t lockDebug; + bool lockDebug; + bool cacheDebug; + uint32_t showCachePeriodSec; } SCtgDebug; typedef struct SCtgTbMetaCache { SRWLatch stbLock; - SHashObj *cache; //key:tbname, value:STableMeta + SRWLatch metaLock; // RC between cache destroy and all other operations + SHashObj *metaCache; //key:tbname, value:STableMeta SHashObj *stbCache; //key:suid, value:STableMeta* } SCtgTbMetaCache; typedef struct SCtgDBCache { SRWLatch vgLock; + uint64_t dbId; int8_t deleted; SDBVgroupInfo *vgInfo; SCtgTbMetaCache tbCache; @@ -136,7 +140,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) -#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { ctgDebug(__VA_ARGS__); } } while (0) +#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { ctgDebug(__VA_ARGS__); } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 @@ -173,6 +178,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); } while (0) + + #ifdef __cplusplus } #endif diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 02773fe533..3776969d90 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -22,6 +22,99 @@ SCatalogMgmt ctgMgmt = {0}; SCtgDebug gCTGDebug = {0}; +void ctgShowDBCache(SHashObj *dbHash) { + if (NULL == dbHash) { + return; + } + + int32_t i = 0; + SCtgDBCache *dbCache = NULL; + void *pIter = taosHashIterate(dbHash, NULL); + while (pIter) { + char *dbFName = NULL; + size_t len = 0; + + dbCache = (SCtgDBCache *)pIter; + + taosHashGetKey(dbCache, &dbFName, &len); + + CTG_CACHE_DEBUG("** %dth db [%.*s] **", i, len, dbFName); + + pIter = taosHashIterate(dbHash, pIter); + } +} + +void ctgShowClusterCache(struct SCatalog* pCatalog) { + if (NULL == pCatalog) { + return; + } + + CTG_CACHE_DEBUG("## cluster %"PRIx64" cache Info ##", pCatalog->clusterId); + CTG_CACHE_DEBUG("db cache number:%d", pCatalog->dbCache ? taosHashGetSize(pCatalog->dbCache) : 0); + ctgShowDBCache(pCatalog->dbCache); + +} + +int32_t ctgInitDBCache(struct SCatalog* pCatalog) { + if (NULL == pCatalog->dbCache) { + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == cache) { + ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { + taosHashCleanup(cache); + } + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgInitTbMetaCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { + if (NULL == dbCache->tbCache.metaCache) { + if (dbCache->deleted) { + ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == metaCache) { + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.metaCache, NULL, metaCache)) { + taosHashCleanup(metaCache); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgInitStbCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { + if (NULL == dbCache->tbCache.stbCache) { + if (dbCache->deleted) { + ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == cache) { + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) { + taosHashCleanup(cache); + } + } + + return TSDB_CODE_SUCCESS; +} + + void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { @@ -40,18 +133,20 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { } -void ctgFreeTableMetaCache(SCtgTbMetaCache *table) { - CTG_LOCK(CTG_WRITE, &table->stbLock); - if (table->stbCache) { - taosHashCleanup(table->stbCache); - table->stbCache = NULL; +void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { + CTG_LOCK(CTG_WRITE, &cache->stbLock); + if (cache->stbCache) { + taosHashCleanup(cache->stbCache); + cache->stbCache = NULL; } - CTG_UNLOCK(CTG_WRITE, &table->stbLock); + CTG_UNLOCK(CTG_WRITE, &cache->stbLock); - if (table->cache) { - taosHashCleanup(table->cache); - table->cache = NULL; + CTG_LOCK(CTG_WRITE, &cache->metaLock); + if (cache->metaCache) { + taosHashCleanup(cache->metaCache); + cache->metaCache = NULL; } + CTG_UNLOCK(CTG_WRITE, &cache->metaLock); } void ctgFreeDbCache(SCtgDBCache *dbCache) { @@ -61,9 +156,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { atomic_store_8(&dbCache->deleted, 1); - SDBVgroupInfo *dbInfo = NULL; + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); if (dbCache->vgInfo) { - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); if (dbCache->vgInfo->vgHash) { taosHashCleanup(dbCache->vgInfo->vgHash); @@ -71,8 +165,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { } tfree(dbCache->vgInfo); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); } + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); ctgFreeTableMetaCache(&dbCache->tbCache); } @@ -97,22 +191,21 @@ void ctgFreeHandle(struct SCatalog* pCatalog) { free(pCatalog); } - -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SCtgDBCache **dbCache, bool *inCache) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbFName, SCtgDBCache **dbCache, bool *inCache) { if (NULL == pCatalog->dbCache) { *inCache = false; - ctgWarn("empty db cache, dbName:%s", dbName); + ctgWarn("empty db cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } SCtgDBCache *cache = NULL; while (true) { - cache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); + cache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); if (NULL == cache) { *inCache = false; - ctgWarn("not in db vgroup cache, dbName:%s", dbName); + ctgWarn("not in db vgroup cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } @@ -120,7 +213,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S if (NULL == cache->vgInfo) { CTG_UNLOCK(CTG_READ, &cache->vgLock); taosHashRelease(pCatalog->dbCache, cache); - ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName); + ctgWarn("db cache vgInfo is NULL, dbFName:%s", dbFName); continue; } @@ -131,7 +224,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S *dbCache = cache; *inCache = true; - ctgDebug("Got db vgroup from cache, dbName:%s", dbName); + ctgDebug("Got db vgroup from cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } @@ -189,7 +282,10 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha size_t sz = 0; - STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, tbName, strlen(tbName)); + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName)); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == tbMeta) { taosHashRelease(pCatalog->dbCache, dbCache); @@ -227,15 +323,18 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } - if (NULL == dbCache->tbCache.cache) { + if (NULL == dbCache->tbCache.metaCache) { *exist = 0; taosHashRelease(pCatalog->dbCache, dbCache); ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname); return TSDB_CODE_SUCCESS; } - size_t sz = 0; - STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz); + size_t sz = 0; + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == *pTableMeta) { *exist = 0; taosHashRelease(pCatalog->dbCache, dbCache); @@ -308,7 +407,10 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } - STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname)); + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == pTableMeta) { ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname); taosHashRelease(pCatalog->dbCache, dbCache); @@ -318,7 +420,7 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN *tbType = atomic_load_8(&pTableMeta->tableType); - taosHashRelease(dbCache->tbCache.cache, dbCache); + taosHashRelease(dbCache->tbCache.metaCache, dbCache); taosHashRelease(pCatalog->dbCache, dbCache); ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType); @@ -741,6 +843,98 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si return TSDB_CODE_SUCCESS; } +int32_t ctgAddDBCache(struct SCatalog *pCatalog, char *dbFName, SCtgDBCache *dbCache) { + int32_t code = 0; + if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) { + ctgError("taosHashPut db to cache failed, db:%s", dbFName); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + SDbVgVersion vgVersion = {.dbId = dbCache->dbId, .vgVersion = dbCache->vgInfo ? dbCache->vgInfo->vgVersion : -1}; + strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + + ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId); + + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeDbCache(dbCache); + + CTG_RET(code); +} + + +int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { + CTG_LOCK(CTG_READ, &tbCache->metaLock); + if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgUpdateStbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { + bool newAdded = false; + int32_t code = 0; + SSTableMetaVersion metaRent = {.suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion}; + strcpy(metaRent.dbFName, dbFName); + strcpy(metaRent.stbName, tbName); + + CTG_LOCK(CTG_WRITE, &tbCache->stbLock); + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + STableMeta *orig = taosHashAcquire(tbCache->metaCache, tbName, strlen(tbName)); + if (orig) { + if (orig->suid != meta->suid) { + if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, db:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + } + + ctgMetaRentRemove(&pCatalog->stbRent, orig->suid, ctgSTableVersionCompare); + } + + taosHashRelease(tbCache->metaCache, orig); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, tbCache, dbFName, tbName, meta, metaSize)); + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); + if (taosHashPutExt(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + + ctgDebug("update stable to cache, suid:%"PRIx64, meta->suid); + + if (newAdded) { + CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); + } else { + CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + + CTG_RET(code); +} int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { @@ -752,63 +946,15 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + CTG_ERR_RET(ctgInitDBCache(pCatalog)); - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, output->dbFName, output->dbId, &dbCache)); - while (true) { - dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, output->dbFName, strlen(output->dbFName)); - if (dbCache) { - break; - } - - SCtgDBCache newDbCache = {0}; - - if (taosHashPut(pCatalog->dbCache, output->dbFName, strlen(output->dbFName), &newDbCache, sizeof(newDbCache))) { - ctgError("taosHashPut db to cache failed, db:%s", output->dbFName); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - } - - if (NULL == dbCache->tbCache.cache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } - } - - if (NULL == dbCache->tbCache.stbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgInitTbMetaCache(pCatalog, dbCache)); + CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache)); if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { - if (taosHashPut(dbCache->tbCache.cache, output->ctbName, strlen(output->ctbName), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { - ctgError("taosHashPut ctbmeta to cache failed, ctbName:%s", output->ctbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - ctgDebug("ctbmeta updated to cache, ctbName:%s", output->ctbName); + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta))); } if (CTG_IS_META_CTABLE(output->metaType)) { @@ -823,42 +969,11 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { - bool newAdded = false; - SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion}; - strcpy(metaRent.dbFName, output->dbFName); - strcpy(metaRent.stbName, output->tbName); - - CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, output->tbName, strlen(output->tbName)); - if (taosHashPutExt(dbCache->tbCache.stbCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - - ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid); - - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); - } + CTG_ERR_JRET(ctgUpdateStbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); } else { - if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) { - ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); } - ctgDebug("update tablemeta to cache, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - _return: if (dbCache) { @@ -868,30 +983,30 @@ _return: CTG_RET(code); } -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SCtgDBCache** dbCache) { +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache) { bool inCache = false; if (!forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); if (inCache) { return TSDB_CODE_SUCCESS; } - ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbName, forceUpdate); + ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbFName, forceUpdate); } SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; - tstrncpy(input.db, dbName, tListLen(input.db)); + tstrncpy(input.db, dbFName, tListLen(input.db)); input.vgVersion = CTG_DEFAULT_INVALID_VERSION; while (true) { CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache)); + CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbVgroup)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); if (!inCache) { - ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName); + ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbFName); continue; } @@ -901,58 +1016,90 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm return TSDB_CODE_SUCCESS; } +void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) { + CTG_LOCK(CTG_WRITE, &cache->stbLock); + if (cache->stbCache) { + void *pIter = taosHashIterate(cache->stbCache, NULL); + while (pIter) { + uint64_t suid = 0; + taosHashGetKey(pIter, &suid, NULL); -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, bool *removed) { - *removed = false; + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); + ctgDebug("stb removed from rent, suid:%"PRIx64, suid); + + pIter = taosHashIterate(cache->stbCache, pIter); + } + } + CTG_UNLOCK(CTG_WRITE, &cache->stbLock); - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (NULL == dbCache) { - ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName); - return TSDB_CODE_SUCCESS; - } - - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - - if (NULL == dbCache->vgInfo) { - ctgInfo("db vgInfo not in dbCache, may be removed, db:%s, dbId:%"PRIx64, dbName, dbId); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - return TSDB_CODE_SUCCESS; - } - - if (dbCache->vgInfo->dbId != dbId) { - ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbName, dbCache->vgInfo->dbId, dbId); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - return TSDB_CODE_SUCCESS; - } - - if (dbCache->vgInfo->vgHash) { - ctgInfo("cleanup db vgInfo, db:%s, dbId:%"PRIx64, dbName, dbId); - taosHashCleanup(dbCache->vgInfo->vgHash); - tfree(dbCache->vgInfo); - } + ctgFreeTableMetaCache(cache); +} - if (taosHashRemove(pCatalog->dbCache, dbName, strlen(dbName))) { - ctgError("taosHashRemove from dbCache failed, db:%s", dbName); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); +int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) { + if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) { + ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - dbCache->deleted = true; + atomic_store_8(&dbCache->deleted, 1); + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); + if (dbCache->vgInfo) { + ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); + + if (dbCache->vgInfo->vgHash) { + taosHashCleanup(dbCache->vgInfo->vgHash); + } + + tfree(dbCache->vgInfo); + } CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - ctgFreeTableMetaCache(&dbCache->tbCache); + ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache); - taosHashRelease(pCatalog->dbCache, dbCache); + ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); - *removed = true; + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); return TSDB_CODE_SUCCESS; } + +int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { + int32_t code = 0; + SCtgDBCache *dbCache = NULL; + + while (true) { + dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); + if (dbCache) { + if (dbCache->dbId == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } + + CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); + taosHashRelease(pCatalog->dbCache, dbCache); + dbCache = NULL; + } + + SCtgDBCache newDBCache = {0}; + newDBCache.dbId = dbId; + + CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache)); + } + +_return: + + if (dbCache) { + taosHashRelease(pCatalog->dbCache, dbCache); + } + + CTG_RET(code); +} + + int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) { *removed = false; @@ -970,12 +1117,16 @@ int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbNam return TSDB_CODE_SUCCESS; } - if (taosHashRemove(dbCache->tbCache.cache, stbName, strlen(stbName))) { + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (taosHashRemove(dbCache->tbCache.metaCache, stbName, strlen(stbName))) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); taosHashRelease(pCatalog->dbCache, dbCache); ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + } + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); taosHashRelease(pCatalog->dbCache, dbCache); @@ -1255,7 +1406,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SArray** vgroupList) { +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1265,7 +1416,7 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* int32_t code = 0; SArray *vgList = NULL; - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &dbCache)); + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache)); int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash); vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); @@ -1307,89 +1458,64 @@ _return: } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) { int32_t code = 0; - if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { + if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbName:%s, vgHash:%p, vgVersion:%d", dbName, dbInfo->vgHash, dbInfo->vgVersion); + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgInitDBCache(pCatalog)); bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; - - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (dbCache) { - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - - if (NULL == dbCache->vgInfo) { - newAdded = true; - - dbCache->vgInfo = dbInfo; - } else { - if (dbCache->vgInfo->dbId != dbInfo->dbId) { - ctgMetaRentRemove(&pCatalog->dbRent, dbCache->vgInfo->dbId, ctgDbVgVersionCompare); - newAdded = true; - } else if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - - goto _return; - } - - if (dbCache->vgInfo->vgHash) { - ctgInfo("cleanup db vgHash, db:%s", dbName); - taosHashCleanup(dbCache->vgInfo->vgHash); - dbCache->vgInfo->vgHash = NULL; - } - - tfree(dbCache->vgInfo); - dbCache->vgInfo = dbInfo; - } + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + SCtgDBCache *dbCache = NULL; + CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, dbFName, dbId, &dbCache)); + + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); + if (dbCache->deleted) { + ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbInfo->dbId); CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); taosHashRelease(pCatalog->dbCache, dbCache); + CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED); + } + + if (NULL == dbCache->vgInfo) { + dbCache->vgInfo = dbInfo; } else { - SCtgDBCache newDBCache = {0}; - newDBCache.vgInfo = dbInfo; + if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { + ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + taosHashRelease(pCatalog->dbCache, dbCache); + + goto _return; + } - if (taosHashPut(pCatalog->dbCache, dbName, strlen(dbName), &newDBCache, sizeof(newDBCache)) != 0) { - ctgError("taosHashPut db & db vgroup to cache failed, db:%s", dbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + if (dbCache->vgInfo->vgHash) { + ctgInfo("cleanup db vgHash, dbFName:%s", dbFName); + taosHashCleanup(dbCache->vgInfo->vgHash); + dbCache->vgInfo->vgHash = NULL; } - newAdded = true; + tfree(dbCache->vgInfo); + dbCache->vgInfo = dbInfo; } dbInfo = NULL; - strncpy(vgVersion.dbFName, dbName, sizeof(vgVersion.dbFName)); - - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); - } - - ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, vgVersion.vgVersion); + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + taosHashRelease(pCatalog->dbCache, dbCache); + strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + + ctgDebug("dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); _return: @@ -1403,29 +1529,34 @@ _return: } -int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId) { +int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId) { int32_t code = 0; - bool removed = false; - if (NULL == pCatalog || NULL == dbName) { + if (NULL == pCatalog || NULL == dbFName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == pCatalog->dbCache) { return TSDB_CODE_SUCCESS; } - - CTG_ERR_RET(ctgValidateAndRemoveDb(pCatalog, dbName, dbId, &removed)); - if (!removed) { + + SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); + if (NULL == dbCache) { + ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; + } + + if (dbCache->dbId != dbId) { + ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId); return TSDB_CODE_SUCCESS; } - ctgInfo("db removed from cache, db:%s, uid:%"PRIx64, dbName, dbId); + CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); - CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbId, ctgDbVgVersionCompare)); - - ctgDebug("db removed from rent, db:%s, uid:%"PRIx64, dbName, dbId); +_return: + taosHashRelease(pCatalog->dbCache, dbCache); + CTG_RET(code); } @@ -1464,6 +1595,27 @@ int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, con return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); } +int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) { + STableMetaOutput output = {0}; + int32_t code = 0; + + strcpy(output.dbFName, rspMsg->dbFName); + strcpy(output.tbName, rspMsg->tbName); + + SET_META_TYPE_TABLE(output.metaType); + + CTG_ERR_RET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta)); + + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + +_return: + + tfree(output.tbMeta); + + CTG_RET(code); +} + + int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 751fa72347..5b58794651 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -185,7 +185,6 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) { ctgTestCurrentVgVersion = dbVgroup->vgVersion; dbVgroup->hashMethod = 0; - dbVgroup->dbId = ctgTestDbId; dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); @@ -600,7 +599,7 @@ void *ctgTestSetDbVgroupThread(void *param) { while (!ctgTestStop) { ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); if (code) { assert(0); } @@ -1109,7 +1108,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { taosArrayDestroy(vgList); ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); ASSERT_EQ(code, 0); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e7b3d08bc5..55099d9972 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -119,9 +119,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pOut->dbId = pRsp->uid; pOut->dbVgroup->vgVersion = pRsp->vgVersion; pOut->dbVgroup->hashMethod = pRsp->hashMethod; - pOut->dbVgroup->dbId = pRsp->uid; pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { qError("taosHashInit %d failed", pRsp->vgNum); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ee5bea0ab7..55d40ff98c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -418,6 +418,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")