diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 70e76517c6..46ad90c14c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -726,6 +726,7 @@ typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; + uint64_t dbId; int32_t numOfTags; int32_t numOfColumns; int8_t precision; diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index c291ebd8fd..ab1298785a 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -32,6 +32,15 @@ extern "C" { struct SCatalog; +enum { + CTG_DBG_DB_NUM = 1, + CTG_DBG_META_NUM, + CTG_DBG_STB_NUM, + CTG_DBG_DB_RENT_NUM, + CTG_DBG_STB_RENT_NUM, +}; + + typedef struct SCatalogReq { SArray *pTableName; // element is SNAME SArray *pUdf; // udf name @@ -99,7 +108,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, */ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, bool forceUpdate, SArray** pVgroupList); -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, SDBVgroupInfo* dbInfo); int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId); @@ -127,6 +136,8 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons */ int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta); +int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg); + /** * Force renew a table's local cached meta data. diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 2e4093590d..549f36a898 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -81,7 +81,6 @@ typedef struct STableMeta { } STableMeta; typedef struct SDBVgroupInfo { - uint64_t dbId; int32_t vgVersion; int8_t hashMethod; SHashObj *vgHash; //key:vgId, value:SVgroupInfo @@ -89,6 +88,7 @@ typedef struct SDBVgroupInfo { typedef struct SUseDbOutput { char db[TSDB_DB_FNAME_LEN]; + uint64_t dbId; SDBVgroupInfo *dbVgroup; } SUseDbOutput; @@ -102,6 +102,7 @@ enum { typedef struct STableMetaOutput { int32_t metaType; + uint64_t dbId; char dbFName[TSDB_DB_FNAME_LEN]; char ctbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; @@ -159,6 +160,8 @@ void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); +int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta); + extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index b19215c79d..8d3a55241e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -435,6 +435,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error +#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) //Database is dropped +#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) //catalog is out of service //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d265ffaa94..3e1af765b0 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -44,7 +44,6 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog code = catalogRemoveDB(pCatalog, rsp->db, rsp->uid); } else { SDBVgroupInfo vgInfo = {0}; - vgInfo.dbId = rsp->uid; vgInfo.vgVersion = rsp->vgVersion; vgInfo.hashMethod = rsp->hashMethod; vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); @@ -69,7 +68,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog } } - code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); + code = catalogUpdateDBVgroup(pCatalog, rsp->db, rsp->uid, &vgInfo); if (code) { taosHashCleanup(vgInfo.vgHash); } @@ -101,50 +100,33 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); - code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid); + catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid); } else { + tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); + rsp->numOfTags = ntohl(rsp->numOfTags); + rsp->sversion = ntohl(rsp->sversion); + rsp->tversion = ntohl(rsp->tversion); + rsp->tuid = be64toh(rsp->tuid); + rsp->vgId = ntohl(rsp->vgId); + + SSchema* pSchema = rsp->pSchema; schemaNum = rsp->numOfColumns + rsp->numOfTags; -/* - rsp->vgNum = ntohl(rsp->vgNum); - rsp->uid = be64toh(rsp->uid); - SDBVgroupInfo vgInfo = {0}; - vgInfo.dbId = rsp->uid; - vgInfo.vgVersion = rsp->vgVersion; - vgInfo.hashMethod = rsp->hashMethod; - vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); - if (NULL == vgInfo.vgHash) { - tscError("hash init[%d] failed", rsp->vgNum); - return TSDB_CODE_TSC_OUT_OF_MEMORY; + for (int i = 0; i < schemaNum; ++i) { + pSchema->bytes = ntohl(pSchema->bytes); + pSchema->colId = ntohl(pSchema->colId); + + pSchema++; } - for (int32_t i = 0; i < rsp->vgNum; ++i) { - rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId); - rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin); - rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd); + if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { + tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId); + return TSDB_CODE_TSC_INVALID_VALUE; + } - for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) { - rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port); - } - - if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) { - tscError("hash push failed, errno:%d", errno); - taosHashCleanup(vgInfo.vgHash); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - } - - code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo); - if (code) { - taosHashCleanup(vgInfo.vgHash); - } -*/ - } - - if (code) { - return code; + catalogUpdateSTableMeta(pCatalog, rsp); } msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 006abcd8e2..33e51da28a 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1165,6 +1165,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { strcpy(pMeta->dbFName, pStb->db); strcpy(pMeta->tbName, pInfo->tbName); strcpy(pMeta->stbName, pInfo->tbName); + pMeta->dbId = htobe64(pDb->uid); pMeta->numOfTags = htonl(pStb->numOfTags); pMeta->numOfColumns = htonl(pStb->numOfColumns); pMeta->precision = pDb->cfg.precision; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 9c041d76c7..8f24374387 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -48,18 +48,22 @@ enum { }; typedef struct SCtgDebug { - int32_t lockDebug; + bool lockDebug; + bool cacheDebug; + uint32_t showCachePeriodSec; } SCtgDebug; typedef struct SCtgTbMetaCache { SRWLatch stbLock; - SHashObj *cache; //key:tbname, value:STableMeta + SRWLatch metaLock; // RC between cache destroy and all other operations + SHashObj *metaCache; //key:tbname, value:STableMeta SHashObj *stbCache; //key:suid, value:STableMeta* } SCtgTbMetaCache; typedef struct SCtgDBCache { SRWLatch vgLock; + uint64_t dbId; int8_t deleted; SDBVgroupInfo *vgInfo; SCtgTbMetaCache tbCache; @@ -81,6 +85,7 @@ typedef struct SCtgRentMgmt { typedef struct SCatalog { uint64_t clusterId; + SRWLatch dbLock; SHashObj *dbCache; //key:dbname, value:SCtgDBCache SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; @@ -105,6 +110,8 @@ typedef struct SCatalogStat { } SCatalogStat; typedef struct SCatalogMgmt { + bool exit; + SRWLatch lock; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; SCatalogCfg cfg; @@ -132,11 +139,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__) -#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) -#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) - #define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) +#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 @@ -172,6 +176,15 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); } \ } while (0) + +#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + +#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) +#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0) + + #ifdef __cplusplus } diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 02773fe533..6ecff87a89 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -22,6 +22,164 @@ SCatalogMgmt ctgMgmt = {0}; SCtgDebug gCTGDebug = {0}; +int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) { + return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0; +} + +int32_t ctgDbgGetStbNum(SCtgDBCache *dbCache) { + return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0; +} + +int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) { + int32_t num = 0; + for (uint16_t i = 0; i < rent->slotNum; ++i) { + SCtgRentSlot *slot = &rent->slots[i]; + if (NULL == slot->meta) { + continue; + } + + num += taosArrayGetSize(slot->meta); + } + + return num; +} + +int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) { + if (NULL == pCatalog || NULL == pCatalog->dbCache) { + return 0; + } + + switch (type) { + case CTG_DBG_DB_NUM: + return (int32_t)taosHashGetSize(pCatalog->dbCache); + case CTG_DBG_DB_RENT_NUM: + return ctgDbgGetRentNum(&pCatalog->dbRent); + case CTG_DBG_STB_RENT_NUM: + return ctgDbgGetRentNum(&pCatalog->stbRent); + default: + break; + } + + SCtgDBCache *dbCache = NULL; + int32_t num = 0; + void *pIter = taosHashIterate(pCatalog->dbCache, NULL); + while (pIter) { + dbCache = (SCtgDBCache *)pIter; + switch (type) { + case CTG_DBG_META_NUM: + num += ctgDbgGetTbMetaNum(dbCache); + break; + case CTG_DBG_STB_NUM: + num += ctgDbgGetStbNum(dbCache); + break; + default: + ctgError("invalid type:%d", type); + break; + } + pIter = taosHashIterate(pCatalog->dbCache, pIter); + } + + return num; +} + + +void ctgDbgShowDBCache(SHashObj *dbHash) { + if (NULL == dbHash) { + return; + } + + int32_t i = 0; + SCtgDBCache *dbCache = NULL; + void *pIter = taosHashIterate(dbHash, NULL); + while (pIter) { + char *dbFName = NULL; + size_t len = 0; + + dbCache = (SCtgDBCache *)pIter; + + taosHashGetKey(dbCache, (void **)&dbFName, &len); + + CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId); + + pIter = taosHashIterate(dbHash, pIter); + } +} + + + + +void ctgDbgShowClusterCache(struct SCatalog* pCatalog) { + if (NULL == pCatalog) { + return; + } + + CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCatalog->clusterId, pCatalog); + CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_META_NUM), + ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_RENT_NUM)); + + ctgDbgShowDBCache(pCatalog->dbCache); +} + +int32_t ctgInitDBCache(struct SCatalog* pCatalog) { + if (NULL == pCatalog->dbCache) { + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + if (NULL == cache) { + ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { + taosHashCleanup(cache); + } + } + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgInitTbMetaCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { + if (NULL == dbCache->tbCache.metaCache) { + if (dbCache->deleted) { + ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == metaCache) { + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.metaCache, NULL, metaCache)) { + taosHashCleanup(metaCache); + } + } + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgInitStbCache(struct SCatalog* pCatalog, SCtgDBCache *dbCache) { + if (NULL == dbCache->tbCache.stbCache) { + if (dbCache->deleted) { + ctgInfo("db is dropping, dbId:%"PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); + if (NULL == cache) { + ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) { + taosHashCleanup(cache); + } + } + + return TSDB_CODE_SUCCESS; +} + + void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { if (NULL == mgmt->slots) { @@ -40,18 +198,20 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) { } -void ctgFreeTableMetaCache(SCtgTbMetaCache *table) { - CTG_LOCK(CTG_WRITE, &table->stbLock); - if (table->stbCache) { - taosHashCleanup(table->stbCache); - table->stbCache = NULL; +void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) { + CTG_LOCK(CTG_WRITE, &cache->stbLock); + if (cache->stbCache) { + taosHashCleanup(cache->stbCache); + cache->stbCache = NULL; } - CTG_UNLOCK(CTG_WRITE, &table->stbLock); + CTG_UNLOCK(CTG_WRITE, &cache->stbLock); - if (table->cache) { - taosHashCleanup(table->cache); - table->cache = NULL; + CTG_LOCK(CTG_WRITE, &cache->metaLock); + if (cache->metaCache) { + taosHashCleanup(cache->metaCache); + cache->metaCache = NULL; } + CTG_UNLOCK(CTG_WRITE, &cache->metaLock); } void ctgFreeDbCache(SCtgDBCache *dbCache) { @@ -61,9 +221,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { atomic_store_8(&dbCache->deleted, 1); - SDBVgroupInfo *dbInfo = NULL; + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); if (dbCache->vgInfo) { - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); if (dbCache->vgInfo->vgHash) { taosHashCleanup(dbCache->vgInfo->vgHash); @@ -71,8 +230,8 @@ void ctgFreeDbCache(SCtgDBCache *dbCache) { } tfree(dbCache->vgInfo); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); } + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); ctgFreeTableMetaCache(&dbCache->tbCache); } @@ -97,22 +256,21 @@ void ctgFreeHandle(struct SCatalog* pCatalog) { free(pCatalog); } - -int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, SCtgDBCache **dbCache, bool *inCache) { +int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbFName, SCtgDBCache **dbCache, bool *inCache) { if (NULL == pCatalog->dbCache) { *inCache = false; - ctgWarn("empty db cache, dbName:%s", dbName); + ctgWarn("empty db cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } SCtgDBCache *cache = NULL; while (true) { - cache = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); + cache = taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); if (NULL == cache) { *inCache = false; - ctgWarn("not in db vgroup cache, dbName:%s", dbName); + ctgWarn("not in db vgroup cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } @@ -120,7 +278,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S if (NULL == cache->vgInfo) { CTG_UNLOCK(CTG_READ, &cache->vgLock); taosHashRelease(pCatalog->dbCache, cache); - ctgWarn("db cache vgInfo is NULL, dbName:%s", dbName); + ctgWarn("db cache vgInfo is NULL, dbFName:%s", dbFName); continue; } @@ -131,7 +289,7 @@ int32_t ctgGetDBVgroupFromCache(struct SCatalog* pCatalog, const char *dbName, S *dbCache = cache; *inCache = true; - ctgDebug("Got db vgroup from cache, dbName:%s", dbName); + ctgDebug("Got db vgroup from cache, dbFName:%s", dbFName); return TSDB_CODE_SUCCESS; } @@ -189,7 +347,10 @@ int32_t ctgIsTableMetaExistInCache(struct SCatalog* pCatalog, char *dbFName, cha size_t sz = 0; - STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, tbName, strlen(tbName)); + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *tbMeta = taosHashGet(dbCache->tbCache.metaCache, tbName, strlen(tbName)); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == tbMeta) { taosHashRelease(pCatalog->dbCache, dbCache); @@ -227,15 +388,18 @@ int32_t ctgGetTableMetaFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } - if (NULL == dbCache->tbCache.cache) { + if (NULL == dbCache->tbCache.metaCache) { *exist = 0; taosHashRelease(pCatalog->dbCache, dbCache); ctgWarn("empty tbmeta cache, dbFName:%s, tbName:%s", db, pTableName->tname); return TSDB_CODE_SUCCESS; } - size_t sz = 0; - STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz); + size_t sz = 0; + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *tbMeta = taosHashGetCloneExt(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname), NULL, (void **)pTableMeta, &sz); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == *pTableMeta) { *exist = 0; taosHashRelease(pCatalog->dbCache, dbCache); @@ -308,7 +472,10 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN return TSDB_CODE_SUCCESS; } - STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.cache, pTableName->tname, strlen(pTableName->tname)); + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + STableMeta *pTableMeta = (STableMeta *)taosHashAcquire(dbCache->tbCache.metaCache, pTableName->tname, strlen(pTableName->tname)); + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (NULL == pTableMeta) { ctgWarn("tbmeta not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname); taosHashRelease(pCatalog->dbCache, dbCache); @@ -318,7 +485,7 @@ int32_t ctgGetTableTypeFromCache(struct SCatalog* pCatalog, const SName* pTableN *tbType = atomic_load_8(&pTableMeta->tableType); - taosHashRelease(dbCache->tbCache.cache, dbCache); + taosHashRelease(dbCache->tbCache.metaCache, dbCache); taosHashRelease(pCatalog->dbCache, dbCache); ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType); @@ -526,6 +693,7 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn CTG_RET(code); } +#if 1 int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; @@ -545,7 +713,29 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { return 0; } } +#else +int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; + } +} + +#endif int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; @@ -609,14 +799,15 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si } if (slot->needSort) { + qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); taosArraySort(slot->meta, compare); slot->needSort = false; - qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); } void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); if (NULL == orig) { - qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -741,6 +932,221 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si return TSDB_CODE_SUCCESS; } +int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) { + int32_t code = 0; + + code = taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache)); + if (code) { + if (HASH_NODE_EXIST(code)) { + ctgDebug("db already in cache, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; + } + + ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + SDbVgVersion vgVersion = {.dbId = dbCache->dbId, .vgVersion = dbCache->vgInfo ? dbCache->vgInfo->vgVersion : -1}; + strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + + ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId); + + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); + + return TSDB_CODE_SUCCESS; + +_return: + + ctgFreeDbCache(dbCache); + + CTG_RET(code); +} + + +void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) { + CTG_LOCK(CTG_WRITE, &cache->stbLock); + if (cache->stbCache) { + void *pIter = taosHashIterate(cache->stbCache, NULL); + while (pIter) { + uint64_t *suid = NULL; + taosHashGetKey(pIter, (void **)&suid, NULL); + + if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCatalog->stbRent, *suid, ctgSTableVersionCompare)) { + ctgDebug("stb removed from rent, suid:%"PRIx64, *suid); + } + + pIter = taosHashIterate(cache->stbCache, pIter); + } + } + CTG_UNLOCK(CTG_WRITE, &cache->stbLock); + + ctgFreeTableMetaCache(cache); +} + + +int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) { + if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) { + ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + atomic_store_8(&dbCache->deleted, 1); + + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); + if (dbCache->vgInfo) { + ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); + + if (dbCache->vgInfo->vgHash) { + taosHashCleanup(dbCache->vgInfo->vgHash); + } + + tfree(dbCache->vgInfo); + } + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + + ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache); + + ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); + + CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare)); + + ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId); + + return TSDB_CODE_SUCCESS; +} + + +int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { + int32_t code = 0; + SCtgDBCache *dbCache = NULL; + + CTG_LOCK(CTG_WRITE, &pCatalog->dbLock); + + while (true) { + dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); + if (dbCache) { + // TODO OPEN IT +#if 0 + if (dbCache->dbId == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } +#else + if (0 == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } + + if (dbId && (dbCache->dbId == 0)) { + dbCache->dbId = dbId; + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } + + if (dbCache->dbId == dbId) { + *pCache = dbCache; + return TSDB_CODE_SUCCESS; + } +#endif + code = ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName); + taosHashRelease(pCatalog->dbCache, dbCache); + dbCache = NULL; + if (code) { + if (TSDB_CODE_CTG_DB_DROPPED == code) { + continue; + } + + CTG_ERR_JRET(code); + } + } + + SCtgDBCache newDBCache = {0}; + newDBCache.dbId = dbId; + + CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache)); + } + +_return: + + if (dbCache) { + taosHashRelease(pCatalog->dbCache, dbCache); + } + + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); + + CTG_RET(code); +} + + + +int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { + CTG_LOCK(CTG_READ, &tbCache->metaLock); + if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + ctgError("taosHashPut tbmeta to cache failed, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType); + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgUpdateStbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) { + bool newAdded = false; + int32_t code = 0; + SSTableMetaVersion metaRent = {.suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion}; + strcpy(metaRent.dbFName, dbFName); + strcpy(metaRent.stbName, tbName); + + CTG_LOCK(CTG_WRITE, &tbCache->stbLock); + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + STableMeta *orig = taosHashAcquire(tbCache->metaCache, tbName, strlen(tbName)); + if (orig) { + if (orig->suid != meta->suid) { + if (taosHashRemove(tbCache->stbCache, &orig->suid, sizeof(orig->suid))) { + ctgError("stb not exist in stbCache, db:%s, stb:%s, suid:%"PRIx64, dbFName, tbName, orig->suid); + } + + ctgMetaRentRemove(&pCatalog->stbRent, orig->suid, ctgSTableVersionCompare); + } + + taosHashRelease(tbCache->metaCache, orig); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, tbCache, dbFName, tbName, meta, metaSize)); + + CTG_LOCK(CTG_READ, &tbCache->metaLock); + STableMeta *tbMeta = taosHashGet(tbCache->metaCache, tbName, strlen(tbName)); + if (taosHashPutExt(tbCache->stbCache, &meta->suid, sizeof(meta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, meta->suid); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + CTG_UNLOCK(CTG_READ, &tbCache->metaLock); + + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + + ctgDebug("update stable to cache, suid:%"PRIx64, meta->suid); + + if (newAdded) { + CTG_ERR_RET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); + } else { + CTG_ERR_RET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); + } + + return TSDB_CODE_SUCCESS; + +_return: + + CTG_UNLOCK(CTG_WRITE, &tbCache->stbLock); + + CTG_RET(code); +} int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output) { @@ -752,63 +1158,15 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + CTG_ERR_RET(ctgInitDBCache(pCatalog)); - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, output->dbFName, output->dbId, &dbCache)); - while (true) { - dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, output->dbFName, strlen(output->dbFName)); - if (dbCache) { - break; - } - - SCtgDBCache newDbCache = {0}; - - if (taosHashPut(pCatalog->dbCache, output->dbFName, strlen(output->dbFName), &newDbCache, sizeof(newDbCache))) { - ctgError("taosHashPut db to cache failed, db:%s", output->dbFName); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } - } - - if (NULL == dbCache->tbCache.cache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.cache, NULL, cache)) { - taosHashCleanup(cache); - } - } - - if (NULL == dbCache->tbCache.stbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&dbCache->tbCache.stbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgInitTbMetaCache(pCatalog, dbCache)); + CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache)); if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) { - if (taosHashPut(dbCache->tbCache.cache, output->ctbName, strlen(output->ctbName), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) { - ctgError("taosHashPut ctbmeta to cache failed, ctbName:%s", output->ctbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - ctgDebug("ctbmeta updated to cache, ctbName:%s", output->ctbName); + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta))); } if (CTG_IS_META_CTABLE(output->metaType)) { @@ -823,75 +1181,45 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags); if (TSDB_SUPER_TABLE == output->tbMeta->tableType) { - bool newAdded = false; - SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion}; - strcpy(metaRent.dbFName, output->dbFName); - strcpy(metaRent.stbName, output->tbName); - - CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - STableMeta *tbMeta = taosHashGet(dbCache->tbCache.cache, output->tbName, strlen(output->tbName)); - if (taosHashPutExt(dbCache->tbCache.stbCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &tbMeta, POINTER_BYTES, &newAdded) != 0) { - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - ctgError("taosHashPutExt stable to stable cache failed, suid:%"PRIx64, output->tbMeta->suid); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); - - ctgDebug("update stable to cache, suid:%"PRIx64, output->tbMeta->suid); - - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->stbRent, &metaRent, metaRent.suid, sizeof(SSTableMetaVersion), ctgSTableVersionCompare)); - } + CTG_ERR_JRET(ctgUpdateStbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); } else { - if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) { - ctgError("taosHashPut tablemeta to cache failed, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } + CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->tbName, output->tbMeta, tbSize)); } - ctgDebug("update tablemeta to cache, dbFName:%s, tbName:%s", output->dbFName, output->tbName); - _return: if (dbCache) { - taosHashRelease(pCatalog->dbCache, dbCache); + taosHashRelease(pCatalog->dbCache, dbCache); + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); } CTG_RET(code); } -int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SCtgDBCache** dbCache) { +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache) { bool inCache = false; if (!forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); if (inCache) { return TSDB_CODE_SUCCESS; } - ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbName, forceUpdate); + ctgDebug("failed to get DB vgroupInfo from cache, dbName:%s, load it from mnode, update:%d", dbFName, forceUpdate); } SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; - tstrncpy(input.db, dbName, tListLen(input.db)); + tstrncpy(input.db, dbFName, tListLen(input.db)); input.vgVersion = CTG_DEFAULT_INVALID_VERSION; while (true) { CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, DbOut.dbVgroup)); - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbCache, &inCache)); + CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbId, DbOut.dbVgroup)); + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache)); if (!inCache) { - ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbName); + ctgWarn("can't get db vgroup from cache, will retry, db:%s", dbFName); continue; } @@ -901,58 +1229,6 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm return TSDB_CODE_SUCCESS; } - -int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId, bool *removed) { - *removed = false; - - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (NULL == dbCache) { - ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName); - return TSDB_CODE_SUCCESS; - } - - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - - if (NULL == dbCache->vgInfo) { - ctgInfo("db vgInfo not in dbCache, may be removed, db:%s, dbId:%"PRIx64, dbName, dbId); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - return TSDB_CODE_SUCCESS; - } - - if (dbCache->vgInfo->dbId != dbId) { - ctgInfo("db id already updated, db:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbName, dbCache->vgInfo->dbId, dbId); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - return TSDB_CODE_SUCCESS; - } - - if (dbCache->vgInfo->vgHash) { - ctgInfo("cleanup db vgInfo, db:%s, dbId:%"PRIx64, dbName, dbId); - taosHashCleanup(dbCache->vgInfo->vgHash); - tfree(dbCache->vgInfo); - } - - if (taosHashRemove(pCatalog->dbCache, dbName, strlen(dbName))) { - ctgError("taosHashRemove from dbCache failed, db:%s", dbName); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - dbCache->deleted = true; - - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - - ctgFreeTableMetaCache(&dbCache->tbCache); - - taosHashRelease(pCatalog->dbCache, dbCache); - - *removed = true; - - return TSDB_CODE_SUCCESS; -} - int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) { *removed = false; @@ -970,12 +1246,16 @@ int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbNam return TSDB_CODE_SUCCESS; } - if (taosHashRemove(dbCache->tbCache.cache, stbName, strlen(stbName))) { + CTG_LOCK(CTG_READ, &dbCache->tbCache.metaLock); + if (taosHashRemove(dbCache->tbCache.metaCache, stbName, strlen(stbName))) { + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); taosHashRelease(pCatalog->dbCache, dbCache); ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); - } + } + CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); + CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock); taosHashRelease(pCatalog->dbCache, dbCache); @@ -1104,6 +1384,8 @@ int32_t catalogInit(SCatalogCfg *cfg) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + atomic_store_8(&ctgMgmt.exit, false); + if (cfg) { memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); @@ -1222,17 +1504,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == pCatalog->dbCache) { *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("empty db cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); if (NULL == db) { *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("db not in cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } CTG_LOCK(CTG_READ, &db->vgLock); @@ -1242,7 +1526,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("db not in cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } *version = db->vgInfo->vgVersion; @@ -1252,20 +1536,22 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, bool forceUpdate, SArray** vgroupList) { - if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { +int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { + if (NULL == pCatalog || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + SCtgDBCache* dbCache = NULL; SVgroupInfo *vgInfo = NULL; int32_t code = 0; SArray *vgList = NULL; - CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbName, forceUpdate, &dbCache)); + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, dbFName, forceUpdate, &dbCache)); int32_t vgNum = (int32_t)taosHashGetSize(dbCache->vgInfo->vgHash); vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo)); @@ -1303,130 +1589,119 @@ _return: vgList = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } -int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) { int32_t code = 0; + + CTG_API_ENTER(); - if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { + if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbName:%s, vgHash:%p, vgVersion:%d", dbName, dbInfo->vgHash, dbInfo->vgVersion); + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } - if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == cache) { - ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL != atomic_val_compare_exchange_ptr(&pCatalog->dbCache, NULL, cache)) { - taosHashCleanup(cache); - } - } + CTG_ERR_JRET(ctgInitDBCache(pCatalog)); bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbInfo->dbId, .vgVersion = dbInfo->vgVersion}; - - SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); - if (dbCache) { - CTG_LOCK(CTG_WRITE, &dbCache->vgLock); - - if (NULL == dbCache->vgInfo) { - newAdded = true; - - dbCache->vgInfo = dbInfo; - } else { - if (dbCache->vgInfo->dbId != dbInfo->dbId) { - ctgMetaRentRemove(&pCatalog->dbRent, dbCache->vgInfo->dbId, ctgDbVgVersionCompare); - newAdded = true; - } else if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); - CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); - taosHashRelease(pCatalog->dbCache, dbCache); - - goto _return; - } - - if (dbCache->vgInfo->vgHash) { - ctgInfo("cleanup db vgHash, db:%s", dbName); - taosHashCleanup(dbCache->vgInfo->vgHash); - dbCache->vgInfo->vgHash = NULL; - } - - tfree(dbCache->vgInfo); - dbCache->vgInfo = dbInfo; - } + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + SCtgDBCache *dbCache = NULL; + CTG_ERR_JRET(ctgAcquireDBCache(pCatalog, dbFName, dbId, &dbCache)); + + CTG_LOCK(CTG_WRITE, &dbCache->vgLock); + if (dbCache->deleted) { + ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbId); CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); taosHashRelease(pCatalog->dbCache, dbCache); + CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED); + } + + if (NULL == dbCache->vgInfo) { + dbCache->vgInfo = dbInfo; } else { - SCtgDBCache newDBCache = {0}; - newDBCache.vgInfo = dbInfo; + if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { + ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, current:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + taosHashRelease(pCatalog->dbCache, dbCache); + + goto _return; + } - if (taosHashPut(pCatalog->dbCache, dbName, strlen(dbName), &newDBCache, sizeof(newDBCache)) != 0) { - ctgError("taosHashPut db & db vgroup to cache failed, db:%s", dbName); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + if (dbCache->vgInfo->vgHash) { + ctgDebug("cleanup db vgHash, dbFName:%s", dbFName); + taosHashCleanup(dbCache->vgInfo->vgHash); + dbCache->vgInfo->vgHash = NULL; } - newAdded = true; + tfree(dbCache->vgInfo); + dbCache->vgInfo = dbInfo; } dbInfo = NULL; - strncpy(vgVersion.dbFName, dbName, sizeof(vgVersion.dbFName)); - - if (newAdded) { - CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); - } else { - CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); - } - - ctgDebug("dbName:%s vgroup updated, vgVersion:%d", dbName, vgVersion.vgVersion); + CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); + taosHashRelease(pCatalog->dbCache, dbCache); + strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); + CTG_ERR_JRET(ctgMetaRentUpdate(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion), ctgDbVgVersionCompare)); + + ctgDebug("dbCache updated, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, vgVersion.dbId); _return: + if (dbCache) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); + } + if (dbInfo) { taosHashCleanup(dbInfo->vgHash); dbInfo->vgHash = NULL; tfree(dbInfo); } - CTG_RET(code); + CTG_API_LEAVE(code); } -int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId) { +int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId) { int32_t code = 0; - bool removed = false; - if (NULL == pCatalog || NULL == dbName) { + if (NULL == pCatalog || NULL == dbFName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - if (NULL == pCatalog->dbCache) { - return TSDB_CODE_SUCCESS; - } - - CTG_ERR_RET(ctgValidateAndRemoveDb(pCatalog, dbName, dbId, &removed)); - if (!removed) { - return TSDB_CODE_SUCCESS; - } - - ctgInfo("db removed from cache, db:%s, uid:%"PRIx64, dbName, dbId); + CTG_API_ENTER(); - CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbId, ctgDbVgVersionCompare)); + if (NULL == pCatalog->dbCache) { + CTG_API_LEAVE(TSDB_CODE_SUCCESS); + } + + SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); + if (NULL == dbCache) { + ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName); + CTG_API_LEAVE(TSDB_CODE_SUCCESS); + } + + if (dbCache->dbId != dbId) { + ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId); + taosHashRelease(pCatalog->dbCache, dbCache); + CTG_API_LEAVE(TSDB_CODE_SUCCESS); + } - ctgDebug("db removed from rent, db:%s, uid:%"PRIx64, dbName, dbId); + CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); + +_return: - CTG_RET(code); + taosHashRelease(pCatalog->dbCache, dbCache); + + CTG_API_LEAVE(code); } int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) { @@ -1437,49 +1712,86 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == pCatalog->dbCache) { - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed)); if (!removed) { - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); - CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); + CTG_ERR_JRET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); + +_return: - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1)); } int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1)); } +int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) { + STableMetaOutput output = {0}; + int32_t code = 0; + + CTG_API_ENTER(); + + strcpy(output.dbFName, rspMsg->dbFName); + strcpy(output.tbName, rspMsg->tbName); + + SET_META_TYPE_TABLE(output.metaType); + + CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta)); + + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + +_return: + + tfree(output.tbMeta); + + CTG_API_LEAVE(code); +} + + int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) { if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable)); } int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable)); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + + CTG_API_ENTER(); STableMeta *tbMeta = NULL; int32_t code = 0; @@ -1495,7 +1807,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tNameGetFullDbName(pTableName, db); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache)); - // REMOEV THIS .... + // TODO REMOEV THIS .... if (0 == tbMeta->vgId) { SVgroupInfo vgroup = {0}; @@ -1503,7 +1815,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tbMeta->vgId = vgroup.vgId; } - // REMOVE THIS .... + // TODO REMOVE THIS .... if (tbMeta->tableType == TSDB_SUPER_TABLE) { CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList)); @@ -1542,7 +1854,7 @@ _return: vgList = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1550,10 +1862,12 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, SCtgDBCache* dbCache = NULL; int32_t code = 0; + CTG_API_ENTER(); + char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache)); + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup)); @@ -1564,7 +1878,7 @@ _return: taosHashRelease(pCatalog->dbCache, dbCache); } - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1573,19 +1887,22 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + int32_t code = 0; + pRsp->pTableMeta = NULL; if (pReq->pTableName) { int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName); if (tbNum <= 0) { ctgError("empty table name list, tbNum:%d", tbNum); - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); if (NULL == pRsp->pTableMeta) { ctgError("taosArrayInit %d failed", tbNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } for (int32_t i = 0; i < tbNum; ++i) { @@ -1602,7 +1919,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S } } - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: @@ -1617,7 +1934,7 @@ _return: pRsp->pTableMeta = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { @@ -1625,9 +1942,11 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + //TODO - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) { @@ -1635,7 +1954,9 @@ int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion * CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_RET(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); } int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) { @@ -1643,15 +1964,21 @@ int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } void catalogDestroy(void) { - if (NULL == ctgMgmt.pCluster) { + if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) { return; } + atomic_store_8(&ctgMgmt.exit, true); + + CTG_LOCK(CTG_WRITE, &ctgMgmt.lock); + SCatalog *pCatalog = NULL; void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL); while (pIter) { @@ -1667,6 +1994,8 @@ void catalogDestroy(void) { taosHashCleanup(ctgMgmt.pCluster); ctgMgmt.pCluster = NULL; + CTG_UNLOCK(CTG_WRITE, &ctgMgmt.lock); + qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 751fa72347..d0f98e3c2a 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -39,6 +39,7 @@ namespace { extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta, int32_t *exist); extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output); +extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type); void ctgTestSetPrepareTableMeta(); void ctgTestSetPrepareCTableMeta(); @@ -49,7 +50,7 @@ bool ctgTestStop = false; bool ctgTestEnableSleep = false; bool ctgTestDeadLoop = false; int32_t ctgTestPrintNum = 200000; -int32_t ctgTestMTRunSec = 30; +int32_t ctgTestMTRunSec = 5; int32_t ctgTestCurrentVgVersion = 0; int32_t ctgTestVgVersion = 1; @@ -107,6 +108,7 @@ void ctgTestInitLogFile() { const int32_t maxLogFileNum = 10; tsAsyncLog = 0; + qDebugFlag = 159; char temp[128] = {0}; sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); @@ -185,7 +187,6 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) { ctgTestCurrentVgVersion = dbVgroup->vgVersion; dbVgroup->hashMethod = 0; - dbVgroup->dbId = ctgTestDbId; dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); @@ -209,6 +210,45 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) { *pdbVgroup = dbVgroup; } + +void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) { + strcpy(rspMsg->dbFName, ctgTestDbname); + sprintf(rspMsg->tbName, "%s", ctgTestSTablename); + sprintf(rspMsg->stbName, "%s", ctgTestSTablename); + rspMsg->numOfTags = ctgTestTagNum; + rspMsg->numOfColumns = ctgTestColNum; + rspMsg->precision = 1 + 1; + rspMsg->tableType = TSDB_SUPER_TABLE; + rspMsg->update = 1 + 1; + rspMsg->sversion = ctgTestSVersion + 1; + rspMsg->tversion = ctgTestTVersion + 1; + rspMsg->suid = ctgTestSuid + 1; + rspMsg->tuid = ctgTestSuid + 1; + rspMsg->vgId = 1; + + SSchema *s = NULL; + s = &rspMsg->pSchema[0]; + s->type = TSDB_DATA_TYPE_TIMESTAMP; + s->colId = 1; + s->bytes = 8; + strcpy(s->name, "ts"); + + s = &rspMsg->pSchema[1]; + s->type = TSDB_DATA_TYPE_INT; + s->colId = 2; + s->bytes = 4; + strcpy(s->name, "col1s"); + + s = &rspMsg->pSchema[2]; + s->type = TSDB_DATA_TYPE_BINARY; + s->colId = 3; + s->bytes = 12 + 1; + strcpy(s->name, "tag1s"); + + return; +} + + void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; // todo @@ -592,7 +632,7 @@ void *ctgTestGetDbVgroupThread(void *param) { return NULL; } -void *ctgTestSetDbVgroupThread(void *param) { +void *ctgTestSetSameDbVgroupThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; SDBVgroupInfo *dbVgroup = NULL; @@ -600,7 +640,7 @@ void *ctgTestSetDbVgroupThread(void *param) { while (!ctgTestStop) { ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); if (code) { assert(0); } @@ -616,6 +656,32 @@ void *ctgTestSetDbVgroupThread(void *param) { return NULL; } + +void *ctgTestSetDiffDbVgroupThread(void *param) { + struct SCatalog *pCtg = (struct SCatalog *)param; + int32_t code = 0; + SDBVgroupInfo *dbVgroup = NULL; + int32_t n = 0; + + while (!ctgTestStop) { + ctgTestBuildDBVgroup(&dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup); + if (code) { + assert(0); + } + + if (ctgTestEnableSleep) { + usleep(rand() % 5); + } + if (++n % ctgTestPrintNum == 0) { + printf("Set:%d\n", n); + } + } + + return NULL; +} + + void *ctgTestGetCtableMetaThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; @@ -681,6 +747,8 @@ TEST(tableMeta, normalTable) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + ctgTestInitLogFile(); + ctgTestSetPrepareDbVgroups(); initQueryModuleMsgHandle(); @@ -771,6 +839,8 @@ TEST(tableMeta, childTableCase) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + ctgTestInitLogFile(); + ctgTestSetPrepareDbVgroupsAndChildMeta(); initQueryModuleMsgHandle(); @@ -964,6 +1034,124 @@ TEST(tableMeta, superTableCase) { catalogDestroy(); } +TEST(tableMeta, rmStbMeta) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestInitLogFile(); + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestSTablename); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid); + ASSERT_EQ(tableMeta->suid, ctgTestSuid); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + code = catalogRemoveSTableMeta(pCtg, "1.db1", ctgTestSTablename, ctgTestSuid); + ASSERT_EQ(code, 0); + + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0); + + catalogDestroy(); +} + +TEST(tableMeta, updateStbMeta) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + + ctgTestInitLogFile(); + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestSTablename); + + STableMeta *tableMeta = NULL; + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion); + ASSERT_EQ(tableMeta->uid, ctgTestSuid); + ASSERT_EQ(tableMeta->suid, ctgTestSuid); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tfree(tableMeta); + + STableMetaRsp rsp = {0}; + ctgTestBuildSTableMetaRsp(&rsp); + + code = catalogUpdateSTableMeta(pCtg, &rsp); + ASSERT_EQ(code, 0); + + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1); + ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 1); + + code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta); + ASSERT_EQ(code, 0); + ASSERT_EQ(tableMeta->vgId, 0); + ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE); + ASSERT_EQ(tableMeta->sversion, ctgTestSVersion + 1); + ASSERT_EQ(tableMeta->tversion, ctgTestTVersion + 1); + ASSERT_EQ(tableMeta->uid, ctgTestSuid + 1); + ASSERT_EQ(tableMeta->suid, ctgTestSuid + 1); + ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum); + ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum); + ASSERT_EQ(tableMeta->tableInfo.precision, 1 + 1); + ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); + + tfree(tableMeta); + + catalogDestroy(); +} + + + TEST(tableDistVgroup, normalTable) { struct SCatalog *pCtg = NULL; void *mockPointer = (void *)0x1; @@ -1109,7 +1297,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { taosArrayDestroy(vgList); ctgTestBuildDBVgroup(&dbVgroup); - code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup); ASSERT_EQ(code, 0); code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); @@ -1128,7 +1316,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } -TEST(multiThread, getSetDbVgroupCase) { +TEST(multiThread, getSetRmSameDbVgroup) { struct SCatalog *pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; @@ -1159,10 +1347,10 @@ TEST(multiThread, getSetDbVgroupCase) { pthread_attr_init(&thattr); pthread_t thread1, thread2; - pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg); + pthread_create(&(thread1), &thattr, ctgTestSetSameDbVgroupThread, pCtg); sleep(1); - pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg); + pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg); while (true) { if (ctgTestDeadLoop) { @@ -1179,6 +1367,58 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +TEST(multiThread, getSetRmDiffDbVgroup) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + ctgTestStop = false; + + ctgTestInitLogFile(); + + ctgTestSetPrepareDbVgroups(); + + initQueryModuleMsgHandle(); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1, thread2; + pthread_create(&(thread1), &thattr, ctgTestSetDiffDbVgroupThread, pCtg); + + sleep(1); + pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg); + + while (true) { + if (ctgTestDeadLoop) { + sleep(1); + } else { + sleep(ctgTestMTRunSec); + break; + } + } + + ctgTestStop = true; + sleep(1); + + catalogDestroy(); +} + + TEST(multiThread, ctableMeta) { struct SCatalog *pCtg = NULL; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e7b3d08bc5..3e14bfca09 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -119,9 +119,9 @@ int32_t queryProcessUseDBRsp(void* output, char *msg, int32_t msgSize) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } + pOut->dbId = pRsp->uid; pOut->dbVgroup->vgVersion = pRsp->vgVersion; pOut->dbVgroup->hashMethod = pRsp->hashMethod; - pOut->dbVgroup->dbId = pRsp->uid; pOut->dbVgroup->vgHash = taosHashInit(pRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == pOut->dbVgroup->vgHash) { qError("taosHashInit %d failed", pRsp->vgNum); @@ -159,6 +159,7 @@ _return: } static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) { + pMetaMsg->dbId = be64toh(pMetaMsg->dbId); pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags); pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); pMetaMsg->sversion = ntohl(pMetaMsg->sversion); @@ -258,6 +259,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { } strcpy(pOut->dbFName, pMetaMsg->dbFName); + + pOut->dbId = pMetaMsg->dbId; if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { SET_META_TYPE_BOTH_TABLE(pOut->metaType); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index ca0401113c..4df66dd015 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -418,6 +418,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_INVALID_INPUT, "invalid catalog input TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")