catalog code refactor
This commit is contained in:
parent
2a47a2571b
commit
34b74ff0a4
|
@ -227,7 +227,7 @@ void taos_init_imp(void) {
|
|||
|
||||
rpcInit();
|
||||
|
||||
SCatalogCfg cfg = {.enableVgroupCache = true, .maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||
SCatalogCfg cfg = {.maxDBCacheNum = 100, .maxTblCacheNum = 100};
|
||||
catalogInit(&cfg);
|
||||
|
||||
tscDebug("starting to initialize TAOS driver, local ep: %s", tsLocalEp);
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t);
|
|||
#define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#define CTG_CACHE_ENABLED(cfg) ((cfg)->maxDBCacheNum > 0 || (cfg)->maxTblCacheNum > 0)
|
||||
|
||||
#define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0)
|
||||
#define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0)
|
||||
|
|
|
@ -343,7 +343,9 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
ctgError("init hash[%d] for tablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->tableCache.stableCache) {
|
||||
pCatalog->tableCache.stableCache = taosHashInit(ctgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pCatalog->tableCache.stableCache) {
|
||||
ctgError("init hash[%d] for stablemeta cache failed", ctgMgmt.cfg.maxTblCacheNum);
|
||||
|
@ -354,55 +356,51 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
if (output->metaNum == 2) {
|
||||
if (taosHashPut(pCatalog->tableCache.cache, output->ctbFname, strlen(output->ctbFname), &output->ctbMeta, sizeof(output->ctbMeta)) != 0) {
|
||||
ctgError("push ctable[%s] to table cache failed", output->ctbFname);
|
||||
goto error_exit;
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE != output->tbMeta->tableType) {
|
||||
ctgError("table type[%d] error, expected:%d", output->tbMeta->tableType, TSDB_SUPER_TABLE);
|
||||
goto error_exit;
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tbSize = sizeof(*output->tbMeta) + sizeof(SSchema) * (output->tbMeta->tableInfo.numOfColumns + output->tbMeta->tableInfo.numOfTags);
|
||||
if (taosHashPut(pCatalog->tableCache.cache, output->tbFname, strlen(output->tbFname), output->tbMeta, tbSize) != 0) {
|
||||
ctgError("push table[%s] to table cache failed", output->tbFname);
|
||||
goto error_exit;
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
|
||||
if (taosHashPut(pCatalog->tableCache.stableCache, &output->tbMeta->suid, sizeof(output->tbMeta->suid), &output->tbMeta, POINTER_BYTES) != 0) {
|
||||
ctgError("push suid[%"PRIu64"] to stable cache failed", output->tbMeta->suid);
|
||||
goto error_exit;
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
error_exit:
|
||||
if (pCatalog->vgroupCache.cache) {
|
||||
taosHashCleanup(pCatalog->vgroupCache.cache);
|
||||
pCatalog->vgroupCache.cache = NULL;
|
||||
}
|
||||
|
||||
pCatalog->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||
if (ctgMgmt.pCluster) {
|
||||
ctgError("catalog already init");
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (cfg) {
|
||||
memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg));
|
||||
} else {
|
||||
ctgMgmt.cfg.enableVgroupCache = true;
|
||||
ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
|
||||
ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER;
|
||||
}
|
||||
|
||||
if (CTG_CACHE_ENABLED(&ctgMgmt.cfg)) {
|
||||
ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER);
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -430,8 +428,6 @@ int32_t catalogGetHandle(const char* clusterId , struct SCatalog** catalogHandle
|
|||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
clusterCtg->vgroupCache.vgroupVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
||||
if (taosHashPut(ctgMgmt.pCluster, clusterId, clen, &clusterCtg, POINTER_BYTES)) {
|
||||
ctgError("put cluster %s cache to hash failed", clusterId);
|
||||
tfree(clusterCtg);
|
||||
|
@ -479,7 +475,7 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName
|
|||
}
|
||||
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
pCatalog->dbCache.cache = taosHashInit(CTG_DEFAULT_CACHE_DB_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
pCatalog->dbCache.cache = taosHashInit(ctgMgmt.cfg.maxDBCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == pCatalog->dbCache.cache) {
|
||||
ctgError("init hash[%d] for db cache failed", CTG_DEFAULT_CACHE_DB_NUMBER);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
|
|
Loading…
Reference in New Issue