diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c641fbb1a3..7f9710b03f 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -725,6 +725,7 @@ typedef struct { char tbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN]; + uint64_t dbId; int32_t numOfTags; int32_t numOfColumns; int8_t precision; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6237de36ff..db195c8c76 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -435,6 +435,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x2402) //catalog is not ready #define TSDB_CODE_CTG_MEM_ERROR TAOS_DEF_ERROR_CODE(0, 0x2403) //catalog memory error #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) //catalog system error +#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) //Database is dropped +#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) //catalog is out of service //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) //scheduler status error @@ -450,4 +452,3 @@ int32_t* taosGetErrno(); #endif #endif /*_TD_COMMON_TAOS_ERROR_H_*/ - \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 4ccd4b63c4..51d5fd62d4 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -760,6 +760,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) { strcpy(pMeta->dbFName, pStb->db); strcpy(pMeta->tbName, pInfo->tbName); strcpy(pMeta->stbName, pInfo->tbName); + pMeta->dbId = htobe64(pDb->uid); pMeta->numOfTags = htonl(pStb->numOfTags); pMeta->numOfColumns = htonl(pStb->numOfColumns); pMeta->precision = pDb->cfg.precision; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 021a119d6c..8f24374387 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -85,6 +85,7 @@ typedef struct SCtgRentMgmt { typedef struct SCatalog { uint64_t clusterId; + SRWLatch dbLock; SHashObj *dbCache; //key:dbname, value:SCtgDBCache SCtgRentMgmt dbRent; SCtgRentMgmt stbRent; @@ -109,6 +110,8 @@ typedef struct SCatalogStat { } SCatalogStat; typedef struct SCatalogMgmt { + bool exit; + SRWLatch lock; SHashObj *pCluster; //key: clusterId, value: SCatalog* SCatalogStat stat; SCatalogCfg cfg; @@ -136,10 +139,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define ctgDebug(param, ...) qDebug("CTG:%p " param, pCatalog, __VA_ARGS__) #define ctgTrace(param, ...) qTrace("CTG:%p " param, pCatalog, __VA_ARGS__) -#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) -#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) -#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) - #define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0) #define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0) @@ -177,6 +176,13 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); } \ } while (0) + +#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) +#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) +#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0) + +#define CTG_API_ENTER() do { CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0) +#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(__code); } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index f0ea22197b..268322a86e 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -122,7 +122,7 @@ void ctgDbgShowClusterCache(struct SCatalog* pCatalog) { int32_t ctgInitDBCache(struct SCatalog* pCatalog) { if (NULL == pCatalog->dbCache) { - SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + SHashObj *cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == cache) { ctgError("taosHashInit %d failed", CTG_DEFAULT_CACHE_DB_NUMBER); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); @@ -693,6 +693,7 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn CTG_RET(code); } +#if 1 int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { return -1; @@ -712,7 +713,29 @@ int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { return 0; } } +#else +int32_t ctgSTableVersionCompare(const void* key1, const void* key2) { + if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { + return -1; + } else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { + return 1; + } else { + return 0; + } +} + +int32_t ctgDbVgVersionCompare(const void* key1, const void* key2) { + if (((SDbVgVersion*)key1)->dbId < ((SDbVgVersion*)key2)->dbId) { + return -1; + } else if (((SDbVgVersion*)key1)->dbId > ((SDbVgVersion*)key2)->dbId) { + return 1; + } else { + return 0; + } +} + +#endif int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) { mgmt->slotRIdx = 0; @@ -776,14 +799,15 @@ int32_t ctgMetaRentUpdate(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t si } if (slot->needSort) { + qDebug("meta slot before sorte, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); taosArraySort(slot->meta, compare); slot->needSort = false; - qDebug("meta slot sorted, slot idx:%d, type:%d", widx, mgmt->type); + qDebug("meta slot sorted, slot idx:%d, type:%d, size:%d", widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); } void *orig = taosArraySearch(slot->meta, &id, compare, TD_EQ); if (NULL == orig) { - qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d", id, widx, mgmt->type); + qError("meta not found in slot, id:%"PRIx64", slot idx:%d, type:%d, size:%d", id, widx, mgmt->type, (int32_t)taosArrayGetSize(slot->meta)); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } @@ -910,8 +934,15 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) { int32_t code = 0; - if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) { - ctgError("taosHashPut db to cache failed, db:%s", dbFName); + + code = taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache)); + if (code) { + if (HASH_NODE_EXIST(code)) { + ctgDebug("db already in cache, dbFName:%s", dbFName); + return TSDB_CODE_SUCCESS; + } + + ctgError("taosHashPut db to cache failed, dbFName:%s", dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } @@ -919,7 +950,7 @@ int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCach strncpy(vgVersion.dbFName, dbFName, sizeof(vgVersion.dbFName)); ctgDebug("dbCache added, dbFName:%s, vgVersion:%d, dbId:%"PRIx64, dbFName, vgVersion.vgVersion, dbCache->dbId); - + CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion))); return TSDB_CODE_SUCCESS; @@ -955,8 +986,8 @@ void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) { if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) { - ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); } atomic_store_8(&dbCache->deleted, 1); @@ -965,7 +996,7 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, if (dbCache->vgInfo) { ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - if (dbCache->vgInfo->vgHash) { + if (dbCache->vgInfo->vgHash) { taosHashCleanup(dbCache->vgInfo->vgHash); } @@ -988,6 +1019,8 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) { int32_t code = 0; SCtgDBCache *dbCache = NULL; + + CTG_LOCK(CTG_WRITE, &pCatalog->dbLock); while (true) { dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); @@ -1015,9 +1048,16 @@ int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64 return TSDB_CODE_SUCCESS; } #endif - CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); + code = ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName); taosHashRelease(pCatalog->dbCache, dbCache); dbCache = NULL; + if (code) { + if (TSDB_CODE_CTG_DB_DROPPED == code) { + continue; + } + + CTG_ERR_JRET(code); + } } SCtgDBCache newDBCache = {0}; @@ -1031,6 +1071,8 @@ _return: if (dbCache) { taosHashRelease(pCatalog->dbCache, dbCache); } + + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); CTG_RET(code); } @@ -1147,7 +1189,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out _return: if (dbCache) { - taosHashRelease(pCatalog->dbCache, dbCache); + taosHashRelease(pCatalog->dbCache, dbCache); + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); } CTG_RET(code); @@ -1459,17 +1502,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == pCatalog->dbCache) { *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("empty db cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } SCtgDBCache *db = taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName)); if (NULL == db) { *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("db not in cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } CTG_LOCK(CTG_READ, &db->vgLock); @@ -1479,7 +1524,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, *version = CTG_DEFAULT_INVALID_VERSION; ctgInfo("db not in cache, dbName:%s", dbName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } *version = db->vgInfo->vgVersion; @@ -1489,7 +1534,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, ctgDebug("Got db vgVersion from cache, dbName:%s, vgVersion:%d", dbName, *version); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) { @@ -1497,6 +1542,8 @@ int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + SCtgDBCache* dbCache = NULL; SVgroupInfo *vgInfo = NULL; @@ -1540,12 +1587,14 @@ _return: vgList = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, uint64_t dbId, SDBVgroupInfo* dbInfo) { int32_t code = 0; + + CTG_API_ENTER(); if (NULL == pCatalog || NULL == dbFName || NULL == dbInfo) { CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); @@ -1584,7 +1633,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, ui } if (dbCache->vgInfo->vgHash) { - ctgInfo("cleanup db vgHash, dbFName:%s", dbFName); + ctgDebug("cleanup db vgHash, dbFName:%s", dbFName); taosHashCleanup(dbCache->vgInfo->vgHash); dbCache->vgInfo->vgHash = NULL; } @@ -1605,13 +1654,17 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, ui _return: + if (dbCache) { + CTG_UNLOCK(CTG_WRITE, &pCatalog->dbLock); + } + if (dbInfo) { taosHashCleanup(dbInfo->vgHash); dbInfo->vgHash = NULL; tfree(dbInfo); } - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1622,19 +1675,22 @@ int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbFName, uint64_t CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == pCatalog->dbCache) { - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName)); if (NULL == dbCache) { ctgInfo("db not exist in dbCache, may be removed, dbFName:%s", dbFName); - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } if (dbCache->dbId != dbId) { ctgInfo("db id already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, dbFName, dbCache->dbId, dbId); - return TSDB_CODE_SUCCESS; + taosHashRelease(pCatalog->dbCache, dbCache); + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName)); @@ -1643,7 +1699,7 @@ _return: taosHashRelease(pCatalog->dbCache, dbCache); - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) { @@ -1654,43 +1710,53 @@ int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, c CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + if (NULL == pCatalog->dbCache) { - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed)); if (!removed) { - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); - CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); + CTG_ERR_JRET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare)); ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid); + +_return: - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1)); } int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, 1)); } int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg) { STableMetaOutput output = {0}; int32_t code = 0; + CTG_API_ENTER(); + strcpy(output.dbFName, rspMsg->dbFName); strcpy(output.tbName, rspMsg->tbName); SET_META_TYPE_TABLE(output.metaType); - CTG_ERR_RET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta)); + CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output.tbMeta)); CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); @@ -1698,7 +1764,7 @@ _return: tfree(output.tbMeta); - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1707,17 +1773,23 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pTransporter, con CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - return ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgRenewTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pTableName, isSTable)); } int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { - return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, true, pTableMeta, isSTable)); } int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pTableName || NULL == pVgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + + CTG_API_ENTER(); STableMeta *tbMeta = NULL; int32_t code = 0; @@ -1733,7 +1805,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tNameGetFullDbName(pTableName, db); CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, db, false, &dbCache)); - // REMOEV THIS .... + // TODO REMOEV THIS .... if (0 == tbMeta->vgId) { SVgroupInfo vgroup = {0}; @@ -1741,7 +1813,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S tbMeta->vgId = vgroup.vgId; } - // REMOVE THIS .... + // TODO REMOVE THIS .... if (tbMeta->tableType == TSDB_SUPER_TABLE) { CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, dbCache->vgInfo, pVgroupList)); @@ -1780,7 +1852,7 @@ _return: vgList = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1788,10 +1860,12 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, SCtgDBCache* dbCache = NULL; int32_t code = 0; + CTG_API_ENTER(); + char db[TSDB_DB_FNAME_LEN] = {0}; tNameGetFullDbName(pTableName, db); - CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache)); + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, db, false, &dbCache)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCatalog, dbCache->vgInfo, pTableName, pVgroup)); @@ -1802,7 +1876,7 @@ _return: taosHashRelease(pCatalog->dbCache, dbCache); } - CTG_RET(code); + CTG_API_LEAVE(code); } @@ -1811,19 +1885,22 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + int32_t code = 0; + pRsp->pTableMeta = NULL; if (pReq->pTableName) { int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName); if (tbNum <= 0) { ctgError("empty table name list, tbNum:%d", tbNum); - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); } pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); if (NULL == pRsp->pTableMeta) { ctgError("taosArrayInit %d failed", tbNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); } for (int32_t i = 0; i < tbNum; ++i) { @@ -1840,7 +1917,7 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pTransporter, const S } } - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); _return: @@ -1855,7 +1932,7 @@ _return: pRsp->pTableMeta = NULL; } - CTG_RET(code); + CTG_API_LEAVE(code); } int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { @@ -1863,9 +1940,11 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } + CTG_API_ENTER(); + //TODO - return TSDB_CODE_SUCCESS; + CTG_API_LEAVE(TSDB_CODE_SUCCESS); } int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion **stables, uint32_t *num) { @@ -1873,7 +1952,9 @@ int32_t catalogGetExpiredSTables(struct SCatalog* pCatalog, SSTableMetaVersion * CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_RET(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->stbRent, (void **)stables, num, sizeof(SSTableMetaVersion))); } int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint32_t *num) { @@ -1881,15 +1962,21 @@ int32_t catalogGetExpiredDBs(struct SCatalog* pCatalog, SDbVgVersion **dbs, uint CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_RET(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); + CTG_API_ENTER(); + + CTG_API_LEAVE(ctgMetaRentGet(&pCatalog->dbRent, (void **)dbs, num, sizeof(SDbVgVersion))); } void catalogDestroy(void) { - if (NULL == ctgMgmt.pCluster) { + if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) { return; } + atomic_store_8(&ctgMgmt.exit, true); + + CTG_LOCK(CTG_WRITE, &ctgMgmt.lock); + SCatalog *pCatalog = NULL; void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL); while (pIter) { @@ -1905,6 +1992,8 @@ void catalogDestroy(void) { taosHashCleanup(ctgMgmt.pCluster); ctgMgmt.pCluster = NULL; + CTG_UNLOCK(CTG_WRITE, &ctgMgmt.lock); + qInfo("catalog destroyed"); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 0214078287..1284f1d7d3 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -108,6 +108,7 @@ void ctgTestInitLogFile() { const int32_t maxLogFileNum = 10; tsAsyncLog = 0; + qDebugFlag = 159; char temp[128] = {0}; sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix); @@ -631,7 +632,7 @@ void *ctgTestGetDbVgroupThread(void *param) { return NULL; } -void *ctgTestSetDbVgroupThread(void *param) { +void *ctgTestSetSameDbVgroupThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; SDBVgroupInfo *dbVgroup = NULL; @@ -655,6 +656,32 @@ void *ctgTestSetDbVgroupThread(void *param) { return NULL; } + +void *ctgTestSetDiffDbVgroupThread(void *param) { + struct SCatalog *pCtg = (struct SCatalog *)param; + int32_t code = 0; + SDBVgroupInfo *dbVgroup = NULL; + int32_t n = 0; + + while (!ctgTestStop) { + ctgTestBuildDBVgroup(&dbVgroup); + code = catalogUpdateDBVgroup(pCtg, ctgTestDbname, ctgTestDbId++, dbVgroup); + if (code) { + assert(0); + } + + if (ctgTestEnableSleep) { + usleep(rand() % 5); + } + if (++n % ctgTestPrintNum == 0) { + printf("Set:%d\n", n); + } + } + + return NULL; +} + + void *ctgTestGetCtableMetaThread(void *param) { struct SCatalog *pCtg = (struct SCatalog *)param; int32_t code = 0; @@ -720,6 +747,8 @@ TEST(tableMeta, normalTable) { void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + ctgTestInitLogFile(); + ctgTestSetPrepareDbVgroups(); initQueryModuleMsgHandle(); @@ -1285,7 +1314,7 @@ TEST(dbVgroup, getSetDbVgroupCase) { catalogDestroy(); } -TEST(multiThread, getSetDbVgroupCase) { +TEST(multiThread, getSetRmSameDbVgroup) { struct SCatalog *pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; @@ -1316,10 +1345,10 @@ TEST(multiThread, getSetDbVgroupCase) { pthread_attr_init(&thattr); pthread_t thread1, thread2; - pthread_create(&(thread1), &thattr, ctgTestSetDbVgroupThread, pCtg); + pthread_create(&(thread1), &thattr, ctgTestSetSameDbVgroupThread, pCtg); sleep(1); - pthread_create(&(thread1), &thattr, ctgTestGetDbVgroupThread, pCtg); + pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg); while (true) { if (ctgTestDeadLoop) { @@ -1336,6 +1365,58 @@ TEST(multiThread, getSetDbVgroupCase) { catalogDestroy(); } +TEST(multiThread, getSetRmDiffDbVgroup) { + struct SCatalog *pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo vgInfo = {0}; + SVgroupInfo *pvgInfo = NULL; + SDBVgroupInfo dbVgroup = {0}; + SArray *vgList = NULL; + ctgTestStop = false; + + ctgTestInitLogFile(); + + ctgTestSetPrepareDbVgroups(); + + initQueryModuleMsgHandle(); + + // sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1}; + strcpy(n.dbname, "db1"); + strcpy(n.tname, ctgTestTablename); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1, thread2; + pthread_create(&(thread1), &thattr, ctgTestSetDiffDbVgroupThread, pCtg); + + sleep(1); + pthread_create(&(thread2), &thattr, ctgTestGetDbVgroupThread, pCtg); + + while (true) { + if (ctgTestDeadLoop) { + sleep(1); + } else { + sleep(ctgTestMTRunSec); + break; + } + } + + ctgTestStop = true; + sleep(1); + + catalogDestroy(); +} + + TEST(multiThread, ctableMeta) { struct SCatalog *pCtg = NULL; diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 55099d9972..3e14bfca09 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -159,6 +159,7 @@ _return: } static int32_t queryConvertTableMetaMsg(STableMetaRsp* pMetaMsg) { + pMetaMsg->dbId = be64toh(pMetaMsg->dbId); pMetaMsg->numOfTags = ntohl(pMetaMsg->numOfTags); pMetaMsg->numOfColumns = ntohl(pMetaMsg->numOfColumns); pMetaMsg->sversion = ntohl(pMetaMsg->sversion); @@ -258,6 +259,8 @@ int32_t queryProcessTableMetaRsp(void* output, char *msg, int32_t msgSize) { } strcpy(pOut->dbFName, pMetaMsg->dbFName); + + pOut->dbId = pMetaMsg->dbId; if (pMetaMsg->tableType == TSDB_CHILD_TABLE) { SET_META_TYPE_BOTH_TABLE(pOut->metaType); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 55d40ff98c..937d5d035d 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -419,6 +419,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_NOT_READY, "catalog is not ready" TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")