feature/qnode
This commit is contained in:
parent
1b98943dd2
commit
33b3a978a5
|
@ -32,6 +32,15 @@ extern "C" {
|
||||||
|
|
||||||
struct SCatalog;
|
struct SCatalog;
|
||||||
|
|
||||||
|
enum {
|
||||||
|
CTG_DBG_DB_NUM = 1,
|
||||||
|
CTG_DBG_META_NUM,
|
||||||
|
CTG_DBG_STB_NUM,
|
||||||
|
CTG_DBG_DB_RENT_NUM,
|
||||||
|
CTG_DBG_STB_RENT_NUM,
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
typedef struct SCatalogReq {
|
typedef struct SCatalogReq {
|
||||||
SArray *pTableName; // element is SNAME
|
SArray *pTableName; // element is SNAME
|
||||||
SArray *pUdf; // udf name
|
SArray *pUdf; // udf name
|
||||||
|
@ -127,6 +136,8 @@ int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void * pTransporter, cons
|
||||||
*/
|
*/
|
||||||
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
int32_t catalogGetSTableMeta(struct SCatalog* pCatalog, void * pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta);
|
||||||
|
|
||||||
|
int32_t catalogUpdateSTableMeta(struct SCatalog* pCatalog, STableMetaRsp *rspMsg);
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Force renew a table's local cached meta data.
|
* Force renew a table's local cached meta data.
|
||||||
|
|
|
@ -81,7 +81,6 @@ typedef struct STableMeta {
|
||||||
} STableMeta;
|
} STableMeta;
|
||||||
|
|
||||||
typedef struct SDBVgroupInfo {
|
typedef struct SDBVgroupInfo {
|
||||||
uint64_t dbId;
|
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
int8_t hashMethod;
|
int8_t hashMethod;
|
||||||
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
|
SHashObj *vgHash; //key:vgId, value:SVgroupInfo
|
||||||
|
@ -103,6 +102,7 @@ enum {
|
||||||
|
|
||||||
typedef struct STableMetaOutput {
|
typedef struct STableMetaOutput {
|
||||||
int32_t metaType;
|
int32_t metaType;
|
||||||
|
uint64_t dbId;
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
char ctbName[TSDB_TABLE_NAME_LEN];
|
char ctbName[TSDB_TABLE_NAME_LEN];
|
||||||
char tbName[TSDB_TABLE_NAME_LEN];
|
char tbName[TSDB_TABLE_NAME_LEN];
|
||||||
|
@ -160,6 +160,8 @@ void initQueryModuleMsgHandle();
|
||||||
const SSchema* tGetTbnameColumnSchema();
|
const SSchema* tGetTbnameColumnSchema();
|
||||||
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags);
|
||||||
|
|
||||||
|
int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta);
|
||||||
|
|
||||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen);
|
||||||
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
|
extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize);
|
||||||
|
|
||||||
|
|
|
@ -100,50 +100,33 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
||||||
|
|
||||||
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||||
|
|
||||||
code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
|
catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
|
||||||
} else {
|
} else {
|
||||||
|
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||||
|
|
||||||
rsp->numOfTags = ntohl(rsp->numOfTags);
|
rsp->numOfTags = ntohl(rsp->numOfTags);
|
||||||
|
rsp->sversion = ntohl(rsp->sversion);
|
||||||
|
rsp->tversion = ntohl(rsp->tversion);
|
||||||
|
rsp->tuid = be64toh(rsp->tuid);
|
||||||
|
rsp->vgId = ntohl(rsp->vgId);
|
||||||
|
|
||||||
|
SSchema* pSchema = rsp->pSchema;
|
||||||
|
|
||||||
schemaNum = rsp->numOfColumns + rsp->numOfTags;
|
schemaNum = rsp->numOfColumns + rsp->numOfTags;
|
||||||
/*
|
|
||||||
rsp->vgNum = ntohl(rsp->vgNum);
|
|
||||||
rsp->uid = be64toh(rsp->uid);
|
|
||||||
|
|
||||||
SDBVgroupInfo vgInfo = {0};
|
for (int i = 0; i < schemaNum; ++i) {
|
||||||
vgInfo.dbId = rsp->uid;
|
pSchema->bytes = ntohl(pSchema->bytes);
|
||||||
vgInfo.vgVersion = rsp->vgVersion;
|
pSchema->colId = ntohl(pSchema->colId);
|
||||||
vgInfo.hashMethod = rsp->hashMethod;
|
|
||||||
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
pSchema++;
|
||||||
if (NULL == vgInfo.vgHash) {
|
|
||||||
tscError("hash init[%d] failed", rsp->vgNum);
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < rsp->vgNum; ++i) {
|
if (rsp->pSchema[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
|
tscError("invalid colId[%d] for the first column in table meta rsp msg", rsp->pSchema[0].colId);
|
||||||
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
|
}
|
||||||
|
|
||||||
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
|
catalogUpdateSTableMeta(pCatalog, rsp);
|
||||||
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
|
||||||
tscError("hash push failed, errno:%d", errno);
|
|
||||||
taosHashCleanup(vgInfo.vgHash);
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
|
|
||||||
if (code) {
|
|
||||||
taosHashCleanup(vgInfo.vgHash);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code) {
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
|
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
|
||||||
|
|
|
@ -140,8 +140,8 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
||||||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||||
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||||
|
|
||||||
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { ctgDebug(__VA_ARGS__); } } while (0)
|
#define CTG_LOCK_DEBUG(...) do { if (gCTGDebug.lockDebug) { qDebug(__VA_ARGS__); } } while (0)
|
||||||
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { ctgDebug(__VA_ARGS__); } } while (0)
|
#define CTG_CACHE_DEBUG(...) do { if (gCTGDebug.cacheDebug) { qDebug(__VA_ARGS__); } } while (0)
|
||||||
|
|
||||||
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,68 @@ SCatalogMgmt ctgMgmt = {0};
|
||||||
|
|
||||||
SCtgDebug gCTGDebug = {0};
|
SCtgDebug gCTGDebug = {0};
|
||||||
|
|
||||||
void ctgShowDBCache(SHashObj *dbHash) {
|
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
|
||||||
|
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ctgDbgGetStbNum(SCtgDBCache *dbCache) {
|
||||||
|
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ctgDbgGetRentNum(SCtgRentMgmt *rent) {
|
||||||
|
int32_t num = 0;
|
||||||
|
for (uint16_t i = 0; i < rent->slotNum; ++i) {
|
||||||
|
SCtgRentSlot *slot = &rent->slots[i];
|
||||||
|
if (NULL == slot->meta) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
num += taosArrayGetSize(slot->meta);
|
||||||
|
}
|
||||||
|
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type) {
|
||||||
|
if (NULL == pCatalog || NULL == pCatalog->dbCache) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (type) {
|
||||||
|
case CTG_DBG_DB_NUM:
|
||||||
|
return (int32_t)taosHashGetSize(pCatalog->dbCache);
|
||||||
|
case CTG_DBG_DB_RENT_NUM:
|
||||||
|
return ctgDbgGetRentNum(&pCatalog->dbRent);
|
||||||
|
case CTG_DBG_STB_RENT_NUM:
|
||||||
|
return ctgDbgGetRentNum(&pCatalog->stbRent);
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
int32_t num = 0;
|
||||||
|
void *pIter = taosHashIterate(pCatalog->dbCache, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
dbCache = (SCtgDBCache *)pIter;
|
||||||
|
switch (type) {
|
||||||
|
case CTG_DBG_META_NUM:
|
||||||
|
num += ctgDbgGetTbMetaNum(dbCache);
|
||||||
|
break;
|
||||||
|
case CTG_DBG_STB_NUM:
|
||||||
|
num += ctgDbgGetStbNum(dbCache);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ctgError("invalid type:%d", type);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pIter = taosHashIterate(pCatalog->dbCache, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ctgDbgShowDBCache(SHashObj *dbHash) {
|
||||||
if (NULL == dbHash) {
|
if (NULL == dbHash) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -36,23 +97,27 @@ void ctgShowDBCache(SHashObj *dbHash) {
|
||||||
|
|
||||||
dbCache = (SCtgDBCache *)pIter;
|
dbCache = (SCtgDBCache *)pIter;
|
||||||
|
|
||||||
taosHashGetKey(dbCache, &dbFName, &len);
|
taosHashGetKey(dbCache, (void **)&dbFName, &len);
|
||||||
|
|
||||||
CTG_CACHE_DEBUG("** %dth db [%.*s] **", i, len, dbFName);
|
CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
|
||||||
|
|
||||||
pIter = taosHashIterate(dbHash, pIter);
|
pIter = taosHashIterate(dbHash, pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgShowClusterCache(struct SCatalog* pCatalog) {
|
|
||||||
|
|
||||||
|
|
||||||
|
void ctgDbgShowClusterCache(struct SCatalog* pCatalog) {
|
||||||
if (NULL == pCatalog) {
|
if (NULL == pCatalog) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_CACHE_DEBUG("## cluster %"PRIx64" cache Info ##", pCatalog->clusterId);
|
CTG_CACHE_DEBUG("## cluster %"PRIx64" %p cache Info ##", pCatalog->clusterId, pCatalog);
|
||||||
CTG_CACHE_DEBUG("db cache number:%d", pCatalog->dbCache ? taosHashGetSize(pCatalog->dbCache) : 0);
|
CTG_CACHE_DEBUG("db:%d meta:%d stb:%d dbRent:%d stbRent:%d", ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_META_NUM),
|
||||||
ctgShowDBCache(pCatalog->dbCache);
|
ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_DB_RENT_NUM), ctgDbgGetClusterCacheNum(pCatalog, CTG_DBG_STB_RENT_NUM));
|
||||||
|
|
||||||
|
ctgDbgShowDBCache(pCatalog->dbCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgInitDBCache(struct SCatalog* pCatalog) {
|
int32_t ctgInitDBCache(struct SCatalog* pCatalog) {
|
||||||
|
@ -843,7 +908,7 @@ int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t si
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgAddDBCache(struct SCatalog *pCatalog, char *dbFName, SCtgDBCache *dbCache) {
|
int32_t ctgAddDBCache(struct SCatalog *pCatalog, const char *dbFName, SCtgDBCache *dbCache) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) {
|
if (taosHashPut(pCatalog->dbCache, dbFName, strlen(dbFName), dbCache, sizeof(SCtgDBCache))) {
|
||||||
ctgError("taosHashPut db to cache failed, db:%s", dbFName);
|
ctgError("taosHashPut db to cache failed, db:%s", dbFName);
|
||||||
|
@ -867,6 +932,111 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) {
|
||||||
|
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
||||||
|
if (cache->stbCache) {
|
||||||
|
void *pIter = taosHashIterate(cache->stbCache, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
uint64_t *suid = NULL;
|
||||||
|
taosHashGetKey(pIter, (void **)&suid, NULL);
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == ctgMetaRentRemove(&pCatalog->stbRent, *suid, ctgSTableVersionCompare)) {
|
||||||
|
ctgDebug("stb removed from rent, suid:%"PRIx64, *suid);
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = taosHashIterate(cache->stbCache, pIter);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
||||||
|
|
||||||
|
ctgFreeTableMetaCache(cache);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) {
|
||||||
|
if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) {
|
||||||
|
ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic_store_8(&dbCache->deleted, 1);
|
||||||
|
|
||||||
|
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
|
||||||
|
if (dbCache->vgInfo) {
|
||||||
|
ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
|
||||||
|
|
||||||
|
if (dbCache->vgInfo->vgHash) {
|
||||||
|
taosHashCleanup(dbCache->vgInfo->vgHash);
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(dbCache->vgInfo);
|
||||||
|
}
|
||||||
|
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
||||||
|
|
||||||
|
ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache);
|
||||||
|
|
||||||
|
ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
|
||||||
|
|
||||||
|
ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
|
||||||
|
if (dbCache) {
|
||||||
|
// TODO OPEN IT
|
||||||
|
#if 0
|
||||||
|
if (dbCache->dbId == dbId) {
|
||||||
|
*pCache = dbCache;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
#else
|
||||||
|
if (0 == dbId) {
|
||||||
|
*pCache = dbCache;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dbId && (dbCache->dbId == 0)) {
|
||||||
|
dbCache->dbId = dbId;
|
||||||
|
*pCache = dbCache;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dbCache->dbId == dbId) {
|
||||||
|
*pCache = dbCache;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
|
||||||
|
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||||
|
dbCache = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgDBCache newDBCache = {0};
|
||||||
|
newDBCache.dbId = dbId;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache));
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (dbCache) {
|
||||||
|
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
|
int32_t ctgUpdateTbMetaImpl(struct SCatalog *pCatalog, SCtgTbMetaCache *tbCache, char *dbFName, char *tbName, STableMeta *meta, int32_t metaSize) {
|
||||||
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
CTG_LOCK(CTG_READ, &tbCache->metaLock);
|
||||||
if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
|
if (taosHashPut(tbCache->metaCache, tbName, strlen(tbName), meta, metaSize) != 0) {
|
||||||
|
@ -954,7 +1124,7 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
||||||
CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache));
|
CTG_ERR_JRET(ctgInitStbCache(pCatalog, dbCache));
|
||||||
|
|
||||||
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
if (CTG_IS_META_CTABLE(output->metaType) || CTG_IS_META_BOTH(output->metaType)) {
|
||||||
CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
|
CTG_ERR_JRET(ctgUpdateTbMetaImpl(pCatalog, &dbCache->tbCache, output->dbFName, output->ctbName, (STableMeta *)&output->ctbMeta, sizeof(output->ctbMeta)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (CTG_IS_META_CTABLE(output->metaType)) {
|
if (CTG_IS_META_CTABLE(output->metaType)) {
|
||||||
|
@ -1002,7 +1172,7 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut));
|
||||||
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbVgroup));
|
CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbFName, DbOut.dbId, DbOut.dbVgroup));
|
||||||
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
|
CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbFName, dbCache, &inCache));
|
||||||
|
|
||||||
if (!inCache) {
|
if (!inCache) {
|
||||||
|
@ -1016,90 +1186,6 @@ int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgm
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgRemoveAndFreeTableMeta(struct SCatalog* pCatalog, SCtgTbMetaCache *cache) {
|
|
||||||
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
|
||||||
if (cache->stbCache) {
|
|
||||||
void *pIter = taosHashIterate(cache->stbCache, NULL);
|
|
||||||
while (pIter) {
|
|
||||||
uint64_t suid = 0;
|
|
||||||
taosHashGetKey(pIter, &suid, NULL);
|
|
||||||
|
|
||||||
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
|
|
||||||
ctgDebug("stb removed from rent, suid:%"PRIx64, suid);
|
|
||||||
|
|
||||||
pIter = taosHashIterate(cache->stbCache, pIter);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
|
||||||
|
|
||||||
ctgFreeTableMetaCache(cache);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, SCtgDBCache *dbCache, const char* dbFName) {
|
|
||||||
if (taosHashRemove(pCatalog->dbCache, dbFName, strlen(dbFName))) {
|
|
||||||
ctgError("taosHashRemove from dbCache failed, dbFName:%s", dbFName);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic_store_8(&dbCache->deleted, 1);
|
|
||||||
|
|
||||||
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
|
|
||||||
if (dbCache->vgInfo) {
|
|
||||||
ctgInfo("cleanup db vgInfo, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
|
|
||||||
|
|
||||||
if (dbCache->vgInfo->vgHash) {
|
|
||||||
taosHashCleanup(dbCache->vgInfo->vgHash);
|
|
||||||
}
|
|
||||||
|
|
||||||
tfree(dbCache->vgInfo);
|
|
||||||
}
|
|
||||||
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
|
||||||
|
|
||||||
ctgRemoveAndFreeTableMeta(pCatalog, &dbCache->tbCache);
|
|
||||||
|
|
||||||
ctgInfo("db removed from cache, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
|
|
||||||
|
|
||||||
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->dbRent, dbCache->dbId, ctgDbVgVersionCompare));
|
|
||||||
|
|
||||||
ctgDebug("db removed from rent, dbFName:%s, uid:%"PRIx64, dbFName, dbCache->dbId);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgAcquireDBCache(struct SCatalog* pCatalog, const char *dbFName, uint64_t dbId, SCtgDBCache **pCache) {
|
|
||||||
int32_t code = 0;
|
|
||||||
SCtgDBCache *dbCache = NULL;
|
|
||||||
|
|
||||||
while (true) {
|
|
||||||
dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbFName, strlen(dbFName));
|
|
||||||
if (dbCache) {
|
|
||||||
if (dbCache->dbId == dbId) {
|
|
||||||
*pCache = dbCache;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgValidateAndRemoveDb(pCatalog, dbCache, dbFName));
|
|
||||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
|
||||||
dbCache = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCtgDBCache newDBCache = {0};
|
|
||||||
newDBCache.dbId = dbId;
|
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgAddDBCache(pCatalog, dbFName, &newDBCache));
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
|
||||||
|
|
||||||
if (dbCache) {
|
|
||||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
CTG_RET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) {
|
int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) {
|
||||||
*removed = false;
|
*removed = false;
|
||||||
|
|
||||||
|
@ -1407,7 +1493,7 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
|
int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SArray** vgroupList) {
|
||||||
if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
|
if (NULL == pCatalog || NULL == dbFName || NULL == pRpc || NULL == pMgmtEps || NULL == vgroupList) {
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1480,7 +1566,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbFName, ui
|
||||||
|
|
||||||
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
|
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
|
||||||
if (dbCache->deleted) {
|
if (dbCache->deleted) {
|
||||||
ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbInfo->dbId);
|
ctgInfo("db is dropping, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
|
||||||
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
||||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||||
CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
|
CTG_ERR_JRET(TSDB_CODE_CTG_DB_DROPPED);
|
||||||
|
|
|
@ -39,6 +39,7 @@ namespace {
|
||||||
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
|
extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SName *pTableName, STableMeta **pTableMeta,
|
||||||
int32_t *exist);
|
int32_t *exist);
|
||||||
extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output);
|
extern "C" int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *output);
|
||||||
|
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
|
||||||
|
|
||||||
void ctgTestSetPrepareTableMeta();
|
void ctgTestSetPrepareTableMeta();
|
||||||
void ctgTestSetPrepareCTableMeta();
|
void ctgTestSetPrepareCTableMeta();
|
||||||
|
@ -208,6 +209,45 @@ void ctgTestBuildDBVgroup(SDBVgroupInfo **pdbVgroup) {
|
||||||
*pdbVgroup = dbVgroup;
|
*pdbVgroup = dbVgroup;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
|
||||||
|
strcpy(rspMsg->dbFName, ctgTestDbname);
|
||||||
|
sprintf(rspMsg->tbName, "%s", ctgTestSTablename);
|
||||||
|
sprintf(rspMsg->stbName, "%s", ctgTestSTablename);
|
||||||
|
rspMsg->numOfTags = ctgTestTagNum;
|
||||||
|
rspMsg->numOfColumns = ctgTestColNum;
|
||||||
|
rspMsg->precision = 1 + 1;
|
||||||
|
rspMsg->tableType = TSDB_SUPER_TABLE;
|
||||||
|
rspMsg->update = 1 + 1;
|
||||||
|
rspMsg->sversion = ctgTestSVersion + 1;
|
||||||
|
rspMsg->tversion = ctgTestTVersion + 1;
|
||||||
|
rspMsg->suid = ctgTestSuid + 1;
|
||||||
|
rspMsg->tuid = ctgTestSuid + 1;
|
||||||
|
rspMsg->vgId = 1;
|
||||||
|
|
||||||
|
SSchema *s = NULL;
|
||||||
|
s = &rspMsg->pSchema[0];
|
||||||
|
s->type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
|
s->colId = 1;
|
||||||
|
s->bytes = 8;
|
||||||
|
strcpy(s->name, "ts");
|
||||||
|
|
||||||
|
s = &rspMsg->pSchema[1];
|
||||||
|
s->type = TSDB_DATA_TYPE_INT;
|
||||||
|
s->colId = 2;
|
||||||
|
s->bytes = 4;
|
||||||
|
strcpy(s->name, "col1s");
|
||||||
|
|
||||||
|
s = &rspMsg->pSchema[2];
|
||||||
|
s->type = TSDB_DATA_TYPE_BINARY;
|
||||||
|
s->colId = 3;
|
||||||
|
s->bytes = 12 + 1;
|
||||||
|
strcpy(s->name, "tag1s");
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||||
SUseDbRsp *rspMsg = NULL; // todo
|
SUseDbRsp *rspMsg = NULL; // todo
|
||||||
|
|
||||||
|
@ -963,6 +1003,124 @@ TEST(tableMeta, superTableCase) {
|
||||||
catalogDestroy();
|
catalogDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(tableMeta, rmStbMeta) {
|
||||||
|
struct SCatalog *pCtg = NULL;
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
SVgroupInfo vgInfo = {0};
|
||||||
|
|
||||||
|
ctgTestInitLogFile();
|
||||||
|
|
||||||
|
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||||
|
|
||||||
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
|
int32_t code = catalogInit(NULL);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
|
||||||
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestSTablename);
|
||||||
|
|
||||||
|
STableMeta *tableMeta = NULL;
|
||||||
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_EQ(tableMeta->vgId, 0);
|
||||||
|
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
|
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
|
||||||
|
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
|
||||||
|
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
|
||||||
|
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
code = catalogRemoveSTableMeta(pCtg, "1.db1", ctgTestSTablename, ctgTestSuid);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
|
||||||
|
|
||||||
|
catalogDestroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(tableMeta, updateStbMeta) {
|
||||||
|
struct SCatalog *pCtg = NULL;
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
SVgroupInfo vgInfo = {0};
|
||||||
|
|
||||||
|
ctgTestInitLogFile();
|
||||||
|
|
||||||
|
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||||
|
|
||||||
|
initQueryModuleMsgHandle();
|
||||||
|
|
||||||
|
int32_t code = catalogInit(NULL);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
// sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet);
|
||||||
|
code = catalogGetHandle(ctgTestClusterId, &pCtg);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
SName n = {.type = TSDB_TABLE_NAME_T, .acctId = 1};
|
||||||
|
strcpy(n.dbname, "db1");
|
||||||
|
strcpy(n.tname, ctgTestSTablename);
|
||||||
|
|
||||||
|
STableMeta *tableMeta = NULL;
|
||||||
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_EQ(tableMeta->vgId, 0);
|
||||||
|
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
|
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion);
|
||||||
|
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion);
|
||||||
|
ASSERT_EQ(tableMeta->uid, ctgTestSuid);
|
||||||
|
ASSERT_EQ(tableMeta->suid, ctgTestSuid);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
tfree(tableMeta);
|
||||||
|
|
||||||
|
STableMetaRsp rsp = {0};
|
||||||
|
ctgTestBuildSTableMetaRsp(&rsp);
|
||||||
|
|
||||||
|
code = catalogUpdateSTableMeta(pCtg, &rsp);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM), 1);
|
||||||
|
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 1);
|
||||||
|
|
||||||
|
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
ASSERT_EQ(tableMeta->vgId, 0);
|
||||||
|
ASSERT_EQ(tableMeta->tableType, TSDB_SUPER_TABLE);
|
||||||
|
ASSERT_EQ(tableMeta->sversion, ctgTestSVersion + 1);
|
||||||
|
ASSERT_EQ(tableMeta->tversion, ctgTestTVersion + 1);
|
||||||
|
ASSERT_EQ(tableMeta->uid, ctgTestSuid + 1);
|
||||||
|
ASSERT_EQ(tableMeta->suid, ctgTestSuid + 1);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfColumns, ctgTestColNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.numOfTags, ctgTestTagNum);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.precision, 1 + 1);
|
||||||
|
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||||
|
|
||||||
|
tfree(tableMeta);
|
||||||
|
|
||||||
|
catalogDestroy();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
TEST(tableDistVgroup, normalTable) {
|
TEST(tableDistVgroup, normalTable) {
|
||||||
struct SCatalog *pCtg = NULL;
|
struct SCatalog *pCtg = NULL;
|
||||||
void *mockPointer = (void *)0x1;
|
void *mockPointer = (void *)0x1;
|
||||||
|
|
Loading…
Reference in New Issue