feature/qnode
This commit is contained in:
parent
b44baf3c20
commit
48495592c1
|
@ -105,7 +105,8 @@ typedef struct SCtgApiStat {
|
|||
} SCtgApiStat;
|
||||
|
||||
typedef struct SCtgRuntimeStat {
|
||||
|
||||
uint64_t qNum;
|
||||
uint64_t qDoneNum;
|
||||
} SCtgRuntimeStat;
|
||||
|
||||
typedef struct SCtgCacheStat {
|
||||
|
@ -161,6 +162,7 @@ typedef struct SCatalogMgmt {
|
|||
SCtgQNode *head;
|
||||
SCtgQNode *tail;
|
||||
tsem_t sem;
|
||||
uint64_t qRemainNum;
|
||||
pthread_t updateThread;
|
||||
SHashObj *pCluster; //key: clusterId, value: SCatalog*
|
||||
SCatalogStat stat;
|
||||
|
@ -170,6 +172,18 @@ typedef struct SCatalogMgmt {
|
|||
typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
||||
typedef int32_t (*ctgActFunc)(SCtgMetaAction *);
|
||||
|
||||
typedef struct SCtgAction {
|
||||
int32_t actId;
|
||||
char name[32];
|
||||
ctgActFunc func;
|
||||
} SCtgAction;
|
||||
|
||||
#define CTG_QUEUE_ADD() atomic_add_fetch_64(&gCtgMgmt.qRemainNum, 1)
|
||||
#define CTG_QUEUE_SUB() atomic_sub_fetch_64(&gCtgMgmt.qRemainNum, 1)
|
||||
|
||||
#define CTG_STAT_ADD(n) qError("done:%" PRId64, atomic_add_fetch_64(&(n), 1))
|
||||
#define CTG_STAT_SUB(n) atomic_sub_fetch_64(&(n), 1)
|
||||
|
||||
#define CTG_IS_META_NULL(type) ((type) == META_TYPE_NULL_TABLE)
|
||||
#define CTG_IS_META_CTABLE(type) ((type) == META_TYPE_CTABLE)
|
||||
#define CTG_IS_META_TABLE(type) ((type) == META_TYPE_TABLE)
|
||||
|
@ -236,8 +250,8 @@ typedef int32_t (*ctgActFunc)(SCtgMetaAction *);
|
|||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
#define CTG_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
|
||||
|
||||
#define CTG_API_ENTER() do { CTG_API_DEBUG("enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &ctgMgmt.lock); if (atomic_load_8(&ctgMgmt.exit)) { CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_RET(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
|
||||
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &ctgMgmt.lock); CTG_API_DEBUG("leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
|
||||
#define CTG_API_LEAVE(c) do { int32_t __code = c; CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock); CTG_API_DEBUG("CTG API leave %s", __FUNCTION__); CTG_RET(__code); } while (0)
|
||||
#define CTG_API_ENTER() do { CTG_API_DEBUG("CTG API enter %s", __FUNCTION__); CTG_LOCK(CTG_READ, &gCtgMgmt.lock); if (atomic_load_8(&gCtgMgmt.exit)) { CTG_API_LEAVE(TSDB_CODE_CTG_OUT_OF_SERVICE); } } while (0)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -24,11 +24,69 @@ int32_t ctgActRemoveDB(SCtgMetaAction *action);
|
|||
int32_t ctgActRemoveStb(SCtgMetaAction *action);
|
||||
int32_t ctgActRemoveTbl(SCtgMetaAction *action);
|
||||
|
||||
SCatalogMgmt ctgMgmt = {0};
|
||||
|
||||
SCatalogMgmt gCtgMgmt = {0};
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
SCtgAction gCtgAction[CTG_ACT_MAX] = {{
|
||||
CTG_ACT_UPDATE_VG,
|
||||
"update vgInfo",
|
||||
ctgActUpdateVg
|
||||
},
|
||||
{
|
||||
CTG_ACT_UPDATE_TBL,
|
||||
"update tbMeta",
|
||||
ctgActUpdateTbl
|
||||
},
|
||||
{
|
||||
CTG_ACT_REMOVE_DB,
|
||||
"remove DB",
|
||||
ctgActRemoveDB
|
||||
},
|
||||
{
|
||||
CTG_ACT_REMOVE_STB,
|
||||
"remove stbMeta",
|
||||
ctgActRemoveStb
|
||||
},
|
||||
{
|
||||
CTG_ACT_REMOVE_TBL,
|
||||
"remove tbMeta",
|
||||
ctgActRemoveTbl
|
||||
}
|
||||
};
|
||||
|
||||
ctgActFunc ctgActFuncs[] = {ctgActUpdateVg, ctgActUpdateTbl, ctgActRemoveDB, ctgActRemoveStb, ctgActRemoveTbl};
|
||||
int32_t ctgDbgEnableDebug(char *option) {
|
||||
if (0 == strcasecmp(option, "lock")) {
|
||||
gCTGDebug.lockDebug = true;
|
||||
qDebug("lock debug enabled");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (0 == strcasecmp(option, "cache")) {
|
||||
gCTGDebug.cacheDebug = true;
|
||||
qDebug("cache debug enabled");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (0 == strcasecmp(option, "api")) {
|
||||
gCTGDebug.apiDebug = true;
|
||||
qDebug("api debug enabled");
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("invalid debug option:%s", option);
|
||||
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t ctgDbgGetStatNum(char *option, void *res) {
|
||||
if (0 == strcasecmp(option, "runtime.qDoneNum")) {
|
||||
*(uint64_t *)res = atomic_load_64(&gCtgMgmt.stat.runtime.qDoneNum);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qError("invalid stat option:%s", option);
|
||||
|
||||
return TSDB_CODE_CTG_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
int32_t ctgDbgGetTbMetaNum(SCtgDBCache *dbCache) {
|
||||
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
|
||||
|
@ -128,6 +186,45 @@ void ctgDbgShowClusterCache(SCatalog* pCtg) {
|
|||
ctgDbgShowDBCache(pCtg->dbCache);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void ctgPopAction(SCtgMetaAction **action) {
|
||||
SCtgQNode *orig = gCtgMgmt.head;
|
||||
|
||||
SCtgQNode *node = gCtgMgmt.head->next;
|
||||
gCtgMgmt.head = gCtgMgmt.head->next;
|
||||
|
||||
CTG_QUEUE_SUB();
|
||||
|
||||
tfree(orig);
|
||||
|
||||
*action = &node->action;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgPushAction(SCtgMetaAction *action) {
|
||||
SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == node) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
|
||||
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
node->action = *action;
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &gCtgMgmt.qlock);
|
||||
gCtgMgmt.tail->next = node;
|
||||
gCtgMgmt.tail = node;
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.qlock);
|
||||
|
||||
CTG_QUEUE_ADD();
|
||||
//CTG_STAT_ADD(gCtgMgmt.stat.runtime.qNum);
|
||||
|
||||
tsem_post(&gCtgMgmt.sem);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
||||
if (NULL == mgmt->slots) {
|
||||
return;
|
||||
|
@ -210,16 +307,29 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache) {
|
||||
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
|
||||
CTG_LOCK(CTG_READ, &dbCache->vgLock);
|
||||
|
||||
if (dbCache->deleted) {
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
|
||||
|
||||
ctgDebug("db is dropping, dbId:%"PRIx64, dbCache->dbId);
|
||||
|
||||
*inCache = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
if (NULL == dbCache->vgInfo) {
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->vgLock);
|
||||
|
||||
*inCache = false;
|
||||
ctgDebug("db vgInfo is empty, dbId:%"PRIx64, dbCache->dbId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
*inCache = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -302,12 +412,11 @@ int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCac
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgAcquireVgInfo(pCtg, dbCache);
|
||||
if (NULL == dbCache->vgInfo) {
|
||||
ctgAcquireVgInfo(pCtg, dbCache, inCache);
|
||||
if (!(*inCache)) {
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
*pCache = NULL;
|
||||
*inCache = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -325,7 +434,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
|
|||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
ctgDebug("try to get db vgroup from mnode, db:%s", input->db);
|
||||
ctgDebug("try to get db vgInfo from mnode, dbFName:%s", input->db);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(TDMT_MND_USE_DB)](input, &msg, 0, &msgLen);
|
||||
if (code) {
|
||||
|
@ -358,6 +467,8 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
|
|||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
ctgDebug("Got db vgInfo from mnode, dbFName:%s", input->db);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -381,7 +492,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
|
|||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
|
||||
if (NULL == tbMeta) {
|
||||
taosHashRelease(pCtg->dbCache, dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
*exist = 0;
|
||||
ctgDebug("tbmeta not in cache, dbFName:%s, tbName:%s", dbFName, tbName);
|
||||
|
@ -390,7 +501,7 @@ int32_t ctgIsTableMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName,
|
|||
|
||||
*exist = 1;
|
||||
|
||||
taosHashRelease(pCtg->dbCache, dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
ctgDebug("tbmeta is in cache, dbFName:%s, tbName:%s", dbFName, tbName);
|
||||
|
||||
|
@ -414,7 +525,6 @@ int32_t ctgGetTableMetaFromCache(SCatalog* pCtg, const SName* pTableName, STable
|
|||
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||
if (NULL == dbCache) {
|
||||
*exist = 0;
|
||||
ctgWarn("no db cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -486,12 +596,12 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
char dbName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, dbName);
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
SCtgDBCache *dbCache = taosHashAcquire(pCtg->dbCache, dbName, strlen(dbName));
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||
if (NULL == dbCache) {
|
||||
ctgInfo("db not in cache, dbFName:%s", dbName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -500,8 +610,8 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
|
|||
|
||||
if (NULL == pTableMeta) {
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbName, pTableName->tname);
|
||||
taosHashRelease(pCtg->dbCache, dbCache);
|
||||
ctgWarn("tbl not in cache, dbFName:%s, tbName:%s", dbFName, pTableName->tname);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -512,9 +622,9 @@ int32_t ctgGetTableTypeFromCache(SCatalog* pCtg, const SName* pTableName, int32_
|
|||
|
||||
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
|
||||
|
||||
taosHashRelease(pCtg->dbCache, dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbName, pTableName->tname, *tbType);
|
||||
ctgDebug("Got tbtype from cache, dbFName:%s, tbName:%s, type:%d", dbFName, pTableName->tname, *tbType);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -940,15 +1050,15 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
|
|||
SCtgDBCache newDBCache = {0};
|
||||
newDBCache.dbId = dbId;
|
||||
|
||||
newDBCache.tbCache.metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
newDBCache.tbCache.metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == newDBCache.tbCache.metaCache) {
|
||||
ctgError("taosHashInit %d metaCache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
newDBCache.tbCache.stbCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||
newDBCache.tbCache.stbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == newDBCache.tbCache.stbCache) {
|
||||
ctgError("taosHashInit %d stbCache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||
ctgError("taosHashInit %d stbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -1002,6 +1112,10 @@ void ctgRemoveStbRent(SCatalog* pCtg, SCtgTbMetaCache *cache) {
|
|||
|
||||
|
||||
int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
|
||||
uint64_t dbId = dbCache->dbId;
|
||||
|
||||
ctgInfo("start to remove db from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
|
||||
|
||||
atomic_store_8(&dbCache->deleted, 1);
|
||||
|
||||
ctgRemoveStbRent(pCtg, &dbCache->tbCache);
|
||||
|
@ -1018,6 +1132,8 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
|
|||
ctgInfo("taosHashRemove from dbCache failed, may be removed, dbFName:%s", dbFName);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
|
||||
}
|
||||
|
||||
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1160,6 +1276,8 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
ctgDebug("tbmeta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
|
||||
|
||||
if (!isStb) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1183,7 +1301,7 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
|||
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
|
||||
ctgDebug("meta updated to cache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
|
||||
ctgDebug("stb updated to stbCache, dbFName:%s, tbName:%s, tbType:%d", dbFName, tbName, meta->tableType);
|
||||
|
||||
SSTableMetaVersion metaRent = {.dbId = dbId, .suid = meta->suid, .sversion = meta->sversion, .tversion = meta->tversion};
|
||||
strcpy(metaRent.dbFName, dbFName);
|
||||
|
@ -1193,6 +1311,45 @@ int32_t ctgUpdateTblMeta(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, ui
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
|
||||
*dst = malloc(sizeof(SDBVgInfo));
|
||||
if (NULL == *dst) {
|
||||
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
memcpy(*dst, src, sizeof(SDBVgInfo));
|
||||
|
||||
size_t hashSize = taosHashGetSize(src->vgHash);
|
||||
(*dst)->vgHash = taosHashInit(hashSize, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == (*dst)->vgHash) {
|
||||
qError("taosHashInit %d failed", (int32_t)hashSize);
|
||||
tfree(*dst);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
int32_t *vgId = NULL;
|
||||
void *pIter = taosHashIterate(src->vgHash, NULL);
|
||||
while (pIter) {
|
||||
taosHashGetKey(pIter, (void **)&vgId, NULL);
|
||||
|
||||
if (taosHashPut((*dst)->vgHash, (void *)vgId, sizeof(int32_t), pIter, sizeof(SVgroupInfo))) {
|
||||
qError("taosHashPut failed, hashSize:%d", (int32_t)hashSize);
|
||||
taosHashCancelIterate(src->vgHash, pIter);
|
||||
taosHashCleanup((*dst)->vgHash);
|
||||
tfree(*dst);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(src->vgHash, pIter);
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
||||
bool inCache = false;
|
||||
int32_t code = 0;
|
||||
|
@ -1209,23 +1366,39 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const
|
|||
tstrncpy(input.db, dbFName, tListLen(input.db));
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
while (true) {
|
||||
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
|
||||
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
|
||||
|
||||
code = ctgUpdateDBVgInfo(pCtg, dbFName, DbOut.dbId, &DbOut.dbVgroup);
|
||||
if (code && DbOut.dbVgroup) {
|
||||
*pInfo = DbOut.dbVgroup;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
|
||||
|
||||
if (inCache) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
|
||||
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_VG};
|
||||
SCtgUpdateVgMsg *msg = malloc(sizeof(SCtgUpdateVgMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
|
||||
ctgFreeVgInfo(DbOut.dbVgroup);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||
msg->pCtg = pCtg;
|
||||
msg->dbId = DbOut.dbId;
|
||||
msg->dbInfo = DbOut.dbVgroup;
|
||||
|
||||
action.data = msg;
|
||||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
tfree(*pInfo);
|
||||
tfree(msg);
|
||||
|
||||
*pInfo = DbOut.dbVgroup;
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
|
@ -1253,36 +1426,6 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void ctgPopAction(SCtgMetaAction **action) {
|
||||
SCtgQNode *orig = ctgMgmt.head;
|
||||
|
||||
SCtgQNode *node = ctgMgmt.head->next;
|
||||
ctgMgmt.head = ctgMgmt.head->next;
|
||||
|
||||
tfree(orig);
|
||||
|
||||
*action = &node->action;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgPushAction(SCtgMetaAction *action) {
|
||||
SCtgQNode *node = calloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == node) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
|
||||
CTG_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
node->action = *action;
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &ctgMgmt.qlock);
|
||||
ctgMgmt.tail->next = node;
|
||||
ctgMgmt.tail = node;
|
||||
CTG_UNLOCK(CTG_WRITE, &ctgMgmt.qlock);
|
||||
|
||||
tsem_post(&ctgMgmt.sem);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable, STableMetaOutput **pOutput) {
|
||||
|
@ -1295,6 +1438,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
|
|||
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo));
|
||||
|
||||
SCtgUpdateTblMsg *msg = NULL;
|
||||
STableMetaOutput moutput = {0};
|
||||
STableMetaOutput *output = malloc(sizeof(STableMetaOutput));
|
||||
if (NULL == output) {
|
||||
|
@ -1303,7 +1447,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
|
|||
}
|
||||
|
||||
if (CTG_IS_STABLE(isSTable)) {
|
||||
ctgDebug("will renew tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
|
||||
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s", tNameGetTableName(pTableName));
|
||||
|
||||
// if get from mnode failed, will not try vnode
|
||||
CTG_ERR_JRET(ctgGetTableMetaFromMnode(pCtg, pTransporter, pMgmtEps, pTableName, output));
|
||||
|
@ -1312,13 +1456,13 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
|
|||
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
|
||||
}
|
||||
} else {
|
||||
ctgDebug("will renew tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, isStable:%d", tNameGetTableName(pTableName), isSTable);
|
||||
|
||||
// if get from vnode failed or no table meta, will not try mnode
|
||||
CTG_ERR_JRET(ctgGetTableMetaFromVnode(pCtg, pTransporter, pMgmtEps, pTableName, &vgroupInfo, output));
|
||||
|
||||
if (CTG_IS_META_TABLE(output->metaType) && TSDB_SUPER_TABLE == output->tbMeta->tableType) {
|
||||
ctgDebug("will continue to renew tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
|
||||
ctgDebug("will continue to refresh tbmeta since got stb, tbName:%s, metaType:%d", tNameGetTableName(pTableName), output->metaType);
|
||||
|
||||
tfree(output->tbMeta);
|
||||
|
||||
|
@ -1345,16 +1489,22 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
|
|||
}
|
||||
|
||||
if (CTG_IS_META_NULL(output->metaType)) {
|
||||
ctgError("no tablemeta got, tbNmae:%s", tNameGetTableName(pTableName));
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
if (CTG_IS_META_TABLE(output->metaType)) {
|
||||
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d", output->dbFName, output->tbName, output->tbMeta->tableType);
|
||||
} else {
|
||||
ctgDebug("tbmeta got, dbFName:%s, tbName:%s, tbType:%d, stbMetaGot:%d", output->dbFName, output->ctbName, output->ctbMeta.tableType, CTG_IS_META_BOTH(output->metaType));
|
||||
}
|
||||
|
||||
if (pOutput) {
|
||||
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
|
||||
}
|
||||
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
|
||||
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
|
@ -1367,11 +1517,14 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTransporter, const SEpSet* pMgm
|
|||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
tfree(output->tbMeta);
|
||||
tfree(output);
|
||||
tfree(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1440,6 +1593,10 @@ _return:
|
|||
|
||||
tfree(output);
|
||||
|
||||
if (*pTableMeta) {
|
||||
ctgDebug("tbmeta returned, tbName:%s, tbType:%d", pTableName->tname, (*pTableMeta)->tableType);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1465,15 +1622,13 @@ int32_t ctgActRemoveDB(SCtgMetaAction *action) {
|
|||
SCatalog* pCtg = msg->pCtg;
|
||||
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
ctgAcquireDBCache(msg->pCtg, msg->dbFName, &dbCache);
|
||||
ctgGetDBCache(msg->pCtg, msg->dbFName, &dbCache);
|
||||
if (NULL == dbCache) {
|
||||
ctgInfo("db not exist in cache, may be removed, dbFName:%s", msg->dbFName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
if (dbCache->dbId != msg->dbId) {
|
||||
ctgInfo("dbId already updated, dbFName:%s, dbId:%"PRIx64 ", targetId:%"PRIx64, msg->dbFName, dbCache->dbId, msg->dbId);
|
||||
ctgReleaseDBCache(msg->pCtg, dbCache);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
|
@ -1496,7 +1651,7 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
|
|||
|
||||
if ((!CTG_IS_META_CTABLE(output->metaType)) && NULL == output->tbMeta) {
|
||||
ctgError("no valid tbmeta got from meta rsp, dbFName:%s, tbName:%s", output->dbFName, output->tbName);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
if (CTG_IS_META_BOTH(output->metaType) && TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
||||
|
@ -1522,10 +1677,11 @@ int32_t ctgActUpdateTbl(SCtgMetaAction *action) {
|
|||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
taosHashRelease(pCtg->dbCache, dbCache);
|
||||
if (output) {
|
||||
tfree(output->tbMeta);
|
||||
tfree(output);
|
||||
}
|
||||
|
||||
|
||||
tfree(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1591,22 +1747,26 @@ void* ctgUpdateThreadFunc(void* param) {
|
|||
|
||||
qInfo("catalog update thread started");
|
||||
|
||||
CTG_LOCK(CTG_READ, &ctgMgmt.lock);
|
||||
CTG_LOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
|
||||
while (true) {
|
||||
tsem_wait(&ctgMgmt.sem);
|
||||
tsem_wait(&gCtgMgmt.sem);
|
||||
|
||||
if (atomic_load_8(&ctgMgmt.exit)) {
|
||||
if (atomic_load_8(&gCtgMgmt.exit)) {
|
||||
break;
|
||||
}
|
||||
|
||||
SCtgMetaAction *action = NULL;
|
||||
ctgPopAction(&action);
|
||||
|
||||
(*ctgActFuncs[action->act])(action);
|
||||
qDebug("process %s action", gCtgAction[action->act].name);
|
||||
|
||||
(*gCtgAction[action->act].func)(action);
|
||||
|
||||
CTG_STAT_ADD(gCtgMgmt.stat.runtime.qDoneNum);
|
||||
}
|
||||
|
||||
CTG_UNLOCK(CTG_READ, &ctgMgmt.lock);
|
||||
CTG_UNLOCK(CTG_READ, &gCtgMgmt.lock);
|
||||
|
||||
qInfo("catalog update thread stopped");
|
||||
|
||||
|
@ -1619,7 +1779,7 @@ int32_t ctgStartUpdateThread() {
|
|||
pthread_attr_init(&thAttr);
|
||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (pthread_create(&ctgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
|
||||
if (pthread_create(&gCtgMgmt.updateThread, &thAttr, ctgUpdateThreadFunc, NULL) != 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
CTG_ERR_RET(terrno);
|
||||
}
|
||||
|
@ -1630,56 +1790,56 @@ int32_t ctgStartUpdateThread() {
|
|||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
if (ctgMgmt.pCluster) {
|
||||
if (gCtgMgmt.pCluster) {
|
||||
qError("catalog already initialized");
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
atomic_store_8(&ctgMgmt.exit, false);
|
||||
atomic_store_8(&gCtgMgmt.exit, false);
|
||||
|
||||
if (cfg) {
|
||||
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
|
||||
memcpy(&gCtgMgmt.cfg, cfg, sizeof(*cfg));
|
||||
|
||||
if (ctgMgmt.cfg.maxDBCacheNum == 0) {
|
||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
if (gCtgMgmt.cfg.maxDBCacheNum == 0) {
|
||||
gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
}
|
||||
|
||||
if (ctgMgmt.cfg.maxTblCacheNum == 0) {
|
||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
|
||||
if (gCtgMgmt.cfg.maxTblCacheNum == 0) {
|
||||
gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
|
||||
}
|
||||
|
||||
if (ctgMgmt.cfg.dbRentSec == 0) {
|
||||
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
if (gCtgMgmt.cfg.dbRentSec == 0) {
|
||||
gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
}
|
||||
|
||||
if (ctgMgmt.cfg.stbRentSec == 0) {
|
||||
ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
if (gCtgMgmt.cfg.stbRentSec == 0) {
|
||||
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
}
|
||||
} else {
|
||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
|
||||
ctgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
ctgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
|
||||
gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
|
||||
}
|
||||
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
gCtgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == gCtgMgmt.pCluster) {
|
||||
qError("taosHashInit %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgStartUpdateThread());
|
||||
|
||||
tsem_init(&ctgMgmt.sem, 0, 0);
|
||||
tsem_init(&gCtgMgmt.sem, 0, 0);
|
||||
|
||||
ctgMgmt.head = calloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == ctgMgmt.head) {
|
||||
gCtgMgmt.head = calloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == gCtgMgmt.head) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
|
||||
CTG_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
ctgMgmt.tail = ctgMgmt.head;
|
||||
gCtgMgmt.tail = gCtgMgmt.head;
|
||||
|
||||
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", ctgMgmt.cfg.maxDBCacheNum, ctgMgmt.cfg.maxTblCacheNum, ctgMgmt.cfg.dbRentSec, ctgMgmt.cfg.stbRentSec);
|
||||
qDebug("catalog initialized, maxDb:%u, maxTbl:%u, dbRentSec:%u, stbRentSec:%u", gCtgMgmt.cfg.maxDBCacheNum, gCtgMgmt.cfg.maxTblCacheNum, gCtgMgmt.cfg.dbRentSec, gCtgMgmt.cfg.stbRentSec);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1689,7 +1849,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
if (NULL == gCtgMgmt.pCluster) {
|
||||
qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
|
||||
}
|
||||
|
@ -1698,7 +1858,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
|||
SCatalog *clusterCtg = NULL;
|
||||
|
||||
while (true) {
|
||||
SCatalog **ctg = (SCatalog **)taosHashGet(ctgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
|
||||
SCatalog **ctg = (SCatalog **)taosHashGet(gCtgMgmt.pCluster, (char*)&clusterId, sizeof(clusterId));
|
||||
|
||||
if (ctg && (*ctg)) {
|
||||
*catalogHandle = *ctg;
|
||||
|
@ -1714,22 +1874,22 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle) {
|
|||
|
||||
clusterCtg->clusterId = clusterId;
|
||||
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, ctgMgmt.cfg.dbRentSec, CTG_RENT_DB));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, ctgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->dbRent, gCtgMgmt.cfg.dbRentSec, CTG_RENT_DB));
|
||||
CTG_ERR_JRET(ctgMetaRentInit(&clusterCtg->stbRent, gCtgMgmt.cfg.stbRentSec, CTG_RENT_STABLE));
|
||||
|
||||
clusterCtg->dbCache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
clusterCtg->dbCache = taosHashInit(gCtgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||
if (NULL == clusterCtg->dbCache) {
|
||||
qError("taosHashInit %d dbCache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
SHashObj *metaCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
SHashObj *metaCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == metaCache) {
|
||||
qError("taosHashInit failed, num:%d", ctgMgmt.cfg.maxTblCacheNum);
|
||||
qError("taosHashInit failed, num:%d", gCtgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
code = taosHashPut(ctgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
|
||||
code = taosHashPut(gCtgMgmt.pCluster, &clusterId, sizeof(clusterId), &clusterCtg, POINTER_BYTES);
|
||||
if (code) {
|
||||
if (HASH_NODE_EXIST(code)) {
|
||||
ctgFreeHandle(clusterCtg);
|
||||
|
@ -1761,7 +1921,7 @@ void catalogFreeHandle(SCatalog* pCtg) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (taosHashRemove(ctgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
|
||||
if (taosHashRemove(gCtgMgmt.pCluster, &pCtg->clusterId, sizeof(pCtg->clusterId))) {
|
||||
ctgWarn("taosHashRemove from cluster failed, may already be freed, clusterId:%"PRIx64, pCtg->clusterId);
|
||||
return;
|
||||
}
|
||||
|
@ -1793,8 +1953,9 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
|
|||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
ctgAcquireVgInfo(pCtg, dbCache);
|
||||
if (NULL == dbCache->vgInfo) {
|
||||
bool inCache = false;
|
||||
ctgAcquireVgInfo(pCtg, dbCache, &inCache);
|
||||
if (!inCache) {
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
|
||||
*version = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
@ -1877,6 +2038,8 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
|
|||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
||||
_return:
|
||||
|
@ -1920,6 +2083,8 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
|
|||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
|
||||
_return:
|
||||
|
@ -1959,6 +2124,8 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
|
|||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
|
||||
_return:
|
||||
|
@ -2019,6 +2186,8 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
|
|||
|
||||
CTG_ERR_JRET(ctgPushAction(&action));
|
||||
|
||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
||||
_return:
|
||||
|
@ -2059,6 +2228,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
|||
SVgroupInfo vgroupInfo = {0};
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
SArray *vgList = NULL;
|
||||
SDBVgInfo *vgInfo = NULL;
|
||||
|
||||
*pVgList = NULL;
|
||||
|
||||
|
@ -2068,7 +2238,6 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
|||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
SHashObj *vgHash = NULL;
|
||||
SDBVgInfo *vgInfo = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));
|
||||
|
||||
if (dbCache) {
|
||||
|
@ -2254,16 +2423,18 @@ int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion **dbs, uint32_t *num)
|
|||
void catalogDestroy(void) {
|
||||
qInfo("start to destroy catalog");
|
||||
|
||||
if (NULL == ctgMgmt.pCluster || atomic_load_8(&ctgMgmt.exit)) {
|
||||
if (NULL == gCtgMgmt.pCluster || atomic_load_8(&gCtgMgmt.exit)) {
|
||||
return;
|
||||
}
|
||||
|
||||
atomic_store_8(&ctgMgmt.exit, true);
|
||||
atomic_store_8(&gCtgMgmt.exit, true);
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &ctgMgmt.lock);
|
||||
tsem_post(&gCtgMgmt.sem);
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &gCtgMgmt.lock);
|
||||
|
||||
SCatalog *pCtg = NULL;
|
||||
void *pIter = taosHashIterate(ctgMgmt.pCluster, NULL);
|
||||
void *pIter = taosHashIterate(gCtgMgmt.pCluster, NULL);
|
||||
while (pIter) {
|
||||
pCtg = *(SCatalog **)pIter;
|
||||
|
||||
|
@ -2271,13 +2442,13 @@ void catalogDestroy(void) {
|
|||
catalogFreeHandle(pCtg);
|
||||
}
|
||||
|
||||
pIter = taosHashIterate(ctgMgmt.pCluster, pIter);
|
||||
pIter = taosHashIterate(gCtgMgmt.pCluster, pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(ctgMgmt.pCluster);
|
||||
ctgMgmt.pCluster = NULL;
|
||||
taosHashCleanup(gCtgMgmt.pCluster);
|
||||
gCtgMgmt.pCluster = NULL;
|
||||
|
||||
CTG_UNLOCK(CTG_WRITE, &ctgMgmt.lock);
|
||||
CTG_UNLOCK(CTG_WRITE, &gCtgMgmt.lock);
|
||||
|
||||
qInfo("catalog destroyed");
|
||||
}
|
||||
|
|
|
@ -41,11 +41,23 @@ extern "C" int32_t ctgGetTableMetaFromCache(struct SCatalog *pCatalog, const SNa
|
|||
int32_t *exist);
|
||||
extern "C" int32_t ctgDbgGetClusterCacheNum(struct SCatalog* pCatalog, int32_t type);
|
||||
extern "C" int32_t ctgActUpdateTbl(SCtgMetaAction *action);
|
||||
extern "C" int32_t ctgDbgEnableDebug(char *option);
|
||||
extern "C" int32_t ctgDbgGetStatNum(char *option, void *res);
|
||||
|
||||
void ctgTestSetPrepareTableMeta();
|
||||
void ctgTestSetPrepareCTableMeta();
|
||||
void ctgTestSetPrepareSTableMeta();
|
||||
void ctgTestSetPrepareMultiSTableMeta();
|
||||
void ctgTestSetRspTableMeta();
|
||||
void ctgTestSetRspCTableMeta();
|
||||
void ctgTestSetRspSTableMeta();
|
||||
void ctgTestSetRspMultiSTableMeta();
|
||||
|
||||
extern "C" SCatalogMgmt gCtgMgmt;
|
||||
|
||||
enum {
|
||||
CTGT_RSP_VGINFO = 1,
|
||||
CTGT_RSP_TBMETA,
|
||||
CTGT_RSP_CTBMETA,
|
||||
CTGT_RSP_STBMETA,
|
||||
CTGT_RSP_MSTBMETA,
|
||||
};
|
||||
|
||||
bool ctgTestStop = false;
|
||||
bool ctgTestEnableSleep = false;
|
||||
|
@ -69,6 +81,9 @@ char *ctgTestTablename = "table1";
|
|||
char *ctgTestCTablename = "ctable1";
|
||||
char *ctgTestSTablename = "stable1";
|
||||
|
||||
int32_t ctgTestRspFunc[10] = {0};
|
||||
int32_t ctgTestRspIdx = 0;
|
||||
|
||||
void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) {
|
||||
SCreateDbReq *pReq = (SCreateDbReq *)rpcMallocCont(sizeof(SCreateDbReq));
|
||||
strcpy(pReq->db, "1.db1");
|
||||
|
@ -111,6 +126,8 @@ void ctgTestInitLogFile() {
|
|||
tsAsyncLog = 0;
|
||||
qDebugFlag = 159;
|
||||
|
||||
ctgDbgEnableDebug("api");
|
||||
|
||||
char temp[128] = {0};
|
||||
sprintf(temp, "%s/%s", tsLogDir, defaultLogFileNamePrefix);
|
||||
if (taosInitLog(temp, tsNumOfLogLines, maxLogFileNum) < 0) {
|
||||
|
@ -250,7 +267,7 @@ void ctgTestBuildSTableMetaRsp(STableMetaRsp *rspMsg) {
|
|||
}
|
||||
|
||||
|
||||
void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
SUseDbRsp *rspMsg = NULL; // todo
|
||||
|
||||
pRsp->code = 0;
|
||||
|
@ -286,7 +303,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
|||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void ctgTestRspTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
STableMetaRsp *rspMsg = NULL; // todo
|
||||
|
||||
pRsp->code = 0;
|
||||
|
@ -322,7 +339,7 @@ void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM
|
|||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void ctgTestRspCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
STableMetaRsp *rspMsg = NULL; // todo
|
||||
|
||||
pRsp->code = 0;
|
||||
|
@ -365,7 +382,7 @@ void ctgTestPrepareCTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void ctgTestRspSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
STableMetaRsp *rspMsg = NULL; // todo
|
||||
|
||||
pRsp->code = 0;
|
||||
|
@ -408,7 +425,7 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc
|
|||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
void ctgTestRspMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
STableMetaRsp *rspMsg = NULL; // todo
|
||||
static int32_t idx = 1;
|
||||
|
||||
|
@ -454,151 +471,193 @@ void ctgTestPrepareMultiSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg,
|
|||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetPrepareTableMeta();
|
||||
void ctgTestRspByIdx(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
switch (ctgTestRspFunc[ctgTestRspIdx]) {
|
||||
case CTGT_RSP_VGINFO:
|
||||
ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
break;
|
||||
case CTGT_RSP_TBMETA:
|
||||
ctgTestRspTableMeta(shandle, pEpSet, pMsg, pRsp);
|
||||
break;
|
||||
case CTGT_RSP_CTBMETA:
|
||||
ctgTestRspCTableMeta(shandle, pEpSet, pMsg, pRsp);
|
||||
break;
|
||||
case CTGT_RSP_STBMETA:
|
||||
ctgTestRspSTableMeta(shandle, pEpSet, pMsg, pRsp);
|
||||
break;
|
||||
case CTGT_RSP_MSTBMETA:
|
||||
ctgTestRspMultiSTableMeta(shandle, pEpSet, pMsg, pRsp);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
ctgTestRspIdx++;
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetPrepareCTableMeta();
|
||||
void ctgTestRspDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetRspTableMeta();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
void ctgTestRspDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetPrepareSTableMeta();
|
||||
ctgTestSetRspCTableMeta();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ctgTestPrepareDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
void ctgTestRspDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetPrepareMultiSTableMeta();
|
||||
ctgTestSetRspSTableMeta();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareDbVgroups() {
|
||||
void ctgTestRspDbVgroupsAndMultiSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) {
|
||||
ctgTestRspDbVgroups(shandle, pEpSet, pMsg, pRsp);
|
||||
|
||||
ctgTestSetRspMultiSTableMeta();
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
void ctgTestSetRspDbVgroups() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareDbVgroups);
|
||||
stub.set(rpcSendRecv, ctgTestRspDbVgroups);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareDbVgroups);
|
||||
stub.set(f.second, ctgTestRspDbVgroups);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareTableMeta() {
|
||||
void ctgTestSetRspTableMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareTableMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspTableMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareTableMeta);
|
||||
stub.set(f.second, ctgTestRspTableMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareCTableMeta() {
|
||||
void ctgTestSetRspCTableMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareCTableMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspCTableMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareCTableMeta);
|
||||
stub.set(f.second, ctgTestRspCTableMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareSTableMeta() {
|
||||
void ctgTestSetRspSTableMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareSTableMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspSTableMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareSTableMeta);
|
||||
stub.set(f.second, ctgTestRspSTableMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareMultiSTableMeta() {
|
||||
void ctgTestSetRspMultiSTableMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareMultiSTableMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspMultiSTableMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareMultiSTableMeta);
|
||||
stub.set(f.second, ctgTestRspMultiSTableMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareDbVgroupsAndNormalMeta() {
|
||||
void ctgTestSetRspByIdx() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspByIdx);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareDbVgroupsAndNormalMeta);
|
||||
stub.set(f.second, ctgTestRspByIdx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareDbVgroupsAndChildMeta() {
|
||||
|
||||
void ctgTestSetRspDbVgroupsAndNormalMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndChildMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndNormalMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareDbVgroupsAndChildMeta);
|
||||
stub.set(f.second, ctgTestRspDbVgroupsAndNormalMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareDbVgroupsAndSuperMeta() {
|
||||
void ctgTestSetRspDbVgroupsAndChildMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndSuperMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndChildMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareDbVgroupsAndSuperMeta);
|
||||
stub.set(f.second, ctgTestRspDbVgroupsAndChildMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetPrepareDbVgroupsAndMultiSuperMeta() {
|
||||
void ctgTestSetRspDbVgroupsAndSuperMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
|
||||
stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndSuperMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestPrepareDbVgroupsAndMultiSuperMeta);
|
||||
stub.set(f.second, ctgTestRspDbVgroupsAndSuperMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void ctgTestSetRspDbVgroupsAndMultiSuperMeta() {
|
||||
static Stub stub;
|
||||
stub.set(rpcSendRecv, ctgTestRspDbVgroupsAndMultiSuperMeta);
|
||||
{
|
||||
AddrAny any("libtransport.so");
|
||||
std::map<std::string, void *> result;
|
||||
any.get_global_func_addr_dynsym("^rpcSendRecv$", result);
|
||||
for (const auto &f : result) {
|
||||
stub.set(f.second, ctgTestRspDbVgroupsAndMultiSuperMeta);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -719,17 +778,19 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
int32_t code = 0;
|
||||
SDBVgInfo dbVgroup = {0};
|
||||
int32_t n = 0;
|
||||
STableMetaOutput output = {0};
|
||||
STableMetaOutput *output = NULL;
|
||||
|
||||
ctgTestBuildCTableMetaOutput(&output);
|
||||
SCtgMetaAction action = {0};
|
||||
|
||||
action.act = CTG_ACT_UPDATE_TBL;
|
||||
|
||||
while (!ctgTestStop) {
|
||||
output = (STableMetaOutput *)malloc(sizeof(STableMetaOutput));
|
||||
ctgTestBuildCTableMetaOutput(output);
|
||||
|
||||
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)malloc(sizeof(SCtgUpdateTblMsg));
|
||||
msg->pCtg = pCtg;
|
||||
msg->output = &output;
|
||||
msg->output = output;
|
||||
action.data = msg;
|
||||
|
||||
code = ctgActUpdateTbl(&action);
|
||||
|
@ -745,8 +806,6 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
}
|
||||
}
|
||||
|
||||
tfree(output.tbMeta);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -758,7 +817,7 @@ TEST(tableMeta, normalTable) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroups();
|
||||
ctgTestSetRspDbVgroups();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -779,7 +838,11 @@ TEST(tableMeta, normalTable) {
|
|||
ASSERT_EQ(vgInfo.vgId, 8);
|
||||
ASSERT_EQ(vgInfo.epset.numOfEps, 3);
|
||||
|
||||
ctgTestSetPrepareTableMeta();
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
ctgTestSetRspTableMeta();
|
||||
|
||||
STableMeta *tableMeta = NULL;
|
||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||
|
@ -793,6 +856,11 @@ TEST(tableMeta, normalTable) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
|
||||
tableMeta = NULL;
|
||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
@ -841,6 +909,7 @@ TEST(tableMeta, normalTable) {
|
|||
ASSERT_EQ(allStbNum, 0);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableMeta, childTableCase) {
|
||||
|
@ -850,7 +919,7 @@ TEST(tableMeta, childTableCase) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndChildMeta();
|
||||
ctgTestSetRspDbVgroupsAndChildMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -877,6 +946,10 @@ TEST(tableMeta, childTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
tableMeta = NULL;
|
||||
code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
@ -939,6 +1012,7 @@ TEST(tableMeta, childTableCase) {
|
|||
ASSERT_EQ(allStbNum, 1);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableMeta, superTableCase) {
|
||||
|
@ -946,7 +1020,7 @@ TEST(tableMeta, superTableCase) {
|
|||
void *mockPointer = (void *)0x1;
|
||||
SVgroupInfo vgInfo = {0};
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||
ctgTestSetRspDbVgroupsAndSuperMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -975,7 +1049,11 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
ctgTestSetPrepareCTableMeta();
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
ctgTestSetRspCTableMeta();
|
||||
|
||||
tableMeta = NULL;
|
||||
|
||||
|
@ -992,6 +1070,10 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (2 != ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
tableMeta = NULL;
|
||||
code = catalogRefreshGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &tableMeta, 0);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
@ -1041,6 +1123,7 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(allStbNum, 1);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableMeta, rmStbMeta) {
|
||||
|
@ -1050,7 +1133,7 @@ TEST(tableMeta, rmStbMeta) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||
ctgTestSetRspDbVgroupsAndSuperMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1079,9 +1162,17 @@ TEST(tableMeta, rmStbMeta) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
code = catalogRemoveStbMeta(pCtg, "1.db1", ctgTestDbId, ctgTestSTablename, ctgTestSuid);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) || ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 0);
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 0);
|
||||
|
@ -1089,6 +1180,7 @@ TEST(tableMeta, rmStbMeta) {
|
|||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_RENT_NUM), 0);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableMeta, updateStbMeta) {
|
||||
|
@ -1098,7 +1190,7 @@ TEST(tableMeta, updateStbMeta) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||
ctgTestSetRspDbVgroupsAndSuperMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1127,6 +1219,10 @@ TEST(tableMeta, updateStbMeta) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
tfree(tableMeta);
|
||||
|
||||
STableMetaRsp rsp = {0};
|
||||
|
@ -1135,6 +1231,16 @@ TEST(tableMeta, updateStbMeta) {
|
|||
code = catalogUpdateSTableMeta(pCtg, &rsp);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
while (true) {
|
||||
uint64_t n = 0;
|
||||
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
|
||||
if (n != 3) {
|
||||
usleep(100);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM), 1);
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM), 1);
|
||||
ASSERT_EQ(ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_STB_NUM), 1);
|
||||
|
@ -1157,6 +1263,7 @@ TEST(tableMeta, updateStbMeta) {
|
|||
tfree(tableMeta);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt.stat, 0, sizeof(gCtgMgmt.stat));
|
||||
}
|
||||
|
||||
|
||||
|
@ -1167,7 +1274,15 @@ TEST(tableDistVgroup, normalTable) {
|
|||
SVgroupInfo *vgInfo = NULL;
|
||||
SArray *vgList = NULL;
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndNormalMeta();
|
||||
ctgTestInitLogFile();
|
||||
|
||||
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
|
||||
ctgTestRspIdx = 0;
|
||||
ctgTestRspFunc[0] = CTGT_RSP_VGINFO;
|
||||
ctgTestRspFunc[1] = CTGT_RSP_TBMETA;
|
||||
ctgTestRspFunc[2] = CTGT_RSP_VGINFO;
|
||||
|
||||
ctgTestSetRspByIdx();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1191,6 +1306,7 @@ TEST(tableDistVgroup, normalTable) {
|
|||
ASSERT_EQ(vgInfo->epset.numOfEps, 3);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableDistVgroup, childTableCase) {
|
||||
|
@ -1199,7 +1315,16 @@ TEST(tableDistVgroup, childTableCase) {
|
|||
SVgroupInfo *vgInfo = NULL;
|
||||
SArray *vgList = NULL;
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndChildMeta();
|
||||
ctgTestInitLogFile();
|
||||
|
||||
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
|
||||
ctgTestRspIdx = 0;
|
||||
ctgTestRspFunc[0] = CTGT_RSP_VGINFO;
|
||||
ctgTestRspFunc[1] = CTGT_RSP_CTBMETA;
|
||||
ctgTestRspFunc[2] = CTGT_RSP_STBMETA;
|
||||
ctgTestRspFunc[3] = CTGT_RSP_VGINFO;
|
||||
|
||||
ctgTestSetRspByIdx();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1223,6 +1348,7 @@ TEST(tableDistVgroup, childTableCase) {
|
|||
ASSERT_EQ(vgInfo->epset.numOfEps, 4);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(tableDistVgroup, superTableCase) {
|
||||
|
@ -1231,7 +1357,18 @@ TEST(tableDistVgroup, superTableCase) {
|
|||
SVgroupInfo *vgInfo = NULL;
|
||||
SArray *vgList = NULL;
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndSuperMeta();
|
||||
ctgTestInitLogFile();
|
||||
|
||||
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
|
||||
ctgTestRspIdx = 0;
|
||||
ctgTestRspFunc[0] = CTGT_RSP_VGINFO;
|
||||
ctgTestRspFunc[1] = CTGT_RSP_STBMETA;
|
||||
ctgTestRspFunc[2] = CTGT_RSP_STBMETA;
|
||||
ctgTestRspFunc[3] = CTGT_RSP_VGINFO;
|
||||
|
||||
ctgTestSetRspByIdx();
|
||||
|
||||
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1260,6 +1397,7 @@ TEST(tableDistVgroup, superTableCase) {
|
|||
ASSERT_EQ(vgInfo->epset.numOfEps, 3);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(dbVgroup, getSetDbVgroupCase) {
|
||||
|
@ -1272,7 +1410,14 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndNormalMeta();
|
||||
memset(ctgTestRspFunc, 0, sizeof(ctgTestRspFunc));
|
||||
ctgTestRspIdx = 0;
|
||||
ctgTestRspFunc[0] = CTGT_RSP_VGINFO;
|
||||
ctgTestRspFunc[1] = CTGT_RSP_TBMETA;
|
||||
|
||||
|
||||
ctgTestSetRspByIdx();
|
||||
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1292,6 +1437,11 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
|
||||
|
||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
|
||||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(vgInfo.vgId, 8);
|
||||
|
@ -1309,6 +1459,17 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
code = catalogUpdateDBVgInfo(pCtg, ctgTestDbname, ctgTestDbId, dbVgroup);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
while (true) {
|
||||
uint64_t n = 0;
|
||||
ctgDbgGetStatNum("runtime.qDoneNum", (void *)&n);
|
||||
if (n != 3) {
|
||||
usleep(100);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo);
|
||||
ASSERT_EQ(code, 0);
|
||||
ASSERT_EQ(vgInfo.vgId, 7);
|
||||
|
@ -1323,6 +1484,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
|||
taosArrayDestroy(vgList);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(multiThread, getSetRmSameDbVgroup) {
|
||||
|
@ -1336,7 +1498,7 @@ TEST(multiThread, getSetRmSameDbVgroup) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroups();
|
||||
ctgTestSetRspDbVgroups();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1374,6 +1536,7 @@ TEST(multiThread, getSetRmSameDbVgroup) {
|
|||
sleep(1);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
TEST(multiThread, getSetRmDiffDbVgroup) {
|
||||
|
@ -1387,7 +1550,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroups();
|
||||
ctgTestSetRspDbVgroups();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1425,6 +1588,7 @@ TEST(multiThread, getSetRmDiffDbVgroup) {
|
|||
sleep(1);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
|
||||
|
@ -1440,7 +1604,7 @@ TEST(multiThread, ctableMeta) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndChildMeta();
|
||||
ctgTestSetRspDbVgroupsAndChildMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1477,6 +1641,7 @@ TEST(multiThread, ctableMeta) {
|
|||
sleep(2);
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
|
||||
|
@ -1495,7 +1660,7 @@ TEST(rentTest, allRent) {
|
|||
|
||||
ctgTestInitLogFile();
|
||||
|
||||
ctgTestSetPrepareDbVgroupsAndMultiSuperMeta();
|
||||
ctgTestSetRspDbVgroupsAndMultiSuperMeta();
|
||||
|
||||
initQueryModuleMsgHandle();
|
||||
|
||||
|
@ -1525,6 +1690,10 @@ TEST(rentTest, allRent) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.precision, 1);
|
||||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
while (ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_META_NUM) < i) {
|
||||
usleep(100);
|
||||
}
|
||||
|
||||
code = catalogGetExpiredDBs(pCtg, &dbs, &num);
|
||||
ASSERT_EQ(code, 0);
|
||||
printf("%d - expired dbNum:%d\n", i, num);
|
||||
|
@ -1550,6 +1719,7 @@ TEST(rentTest, allRent) {
|
|||
}
|
||||
|
||||
catalogDestroy();
|
||||
memset(&gCtgMgmt, 0, sizeof(gCtgMgmt));
|
||||
}
|
||||
|
||||
int main(int argc, char **argv) {
|
||||
|
|
Loading…
Reference in New Issue