diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index e55c2f57fd..a9abf45c8d 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -45,7 +45,6 @@ typedef struct SMetaData { } SMetaData; typedef struct SCatalogCfg { - bool enableVgroupCache; uint32_t maxTblCacheNum; uint32_t maxDBCacheNum; } SCatalogCfg; @@ -61,8 +60,8 @@ int32_t catalogInit(SCatalogCfg *cfg); int32_t catalogGetHandle(const char *clusterId, struct SCatalog** catalogHandle); int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, int32_t* version); -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo); -int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); + +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo); /** * Get a table's meta data. diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index fa1483de87..ddcfcab4db 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -24,7 +24,6 @@ extern "C" { #include "catalog.h" typedef struct SSchedulerCfg { - int32_t clusterType; int32_t maxJobNum; } SSchedulerCfg; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index 820bcdfa3f..40943849f1 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -66,8 +66,6 @@ typedef uint32_t (*tableNameHashFp)(const char *, uint32_t); #define ctgTrace(...) do { if (ctgDebugFlag & DEBUG_TRACE) { taosPrintLog("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) #define ctgDebugL(...) do { if (ctgDebugFlag & DEBUG_DEBUG) { taosPrintLongString("CTG ", ctgDebugFlag, __VA_ARGS__); }} while(0) -#define CTG_CACHE_ENABLED() (ctgMgmt.cfg.maxDBCacheNum > 0 || ctgMgmt.cfg.maxTblCacheNum > 0) - #define CTG_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define CTG_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) #define CTG_ERR_LRET(c,...) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { ctgError(__VA_ARGS__); terrno = _code; return _code; } } while (0) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index edbe5f66ea..e067a7597a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -370,6 +370,41 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out return TSDB_CODE_SUCCESS; } + +int32_t ctgGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { + if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + int32_t exist = 0; + + if (0 == forceUpdate) { + CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); + + if (exist) { + return TSDB_CODE_SUCCESS; + } + } + + SUseDbOutput DbOut = {0}; + SBuildUseDBInput input = {0}; + + strncpy(input.db, dbName, sizeof(input.db)); + input.db[sizeof(input.db) - 1] = 0; + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + + CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); + + CTG_ERR_RET(catalogUpdateDBVgroup(pCatalog, dbName, &DbOut.dbVgroup)); + + if (dbInfo) { + *dbInfo = DbOut.dbVgroup; + } + + return TSDB_CODE_SUCCESS; +} + + int32_t catalogInit(SCatalogCfg *cfg) { if (ctgMgmt.pCluster) { ctgError("catalog already init"); @@ -378,16 +413,22 @@ int32_t catalogInit(SCatalogCfg *cfg) { if (cfg) { memcpy(&ctgMgmt.cfg, cfg, sizeof(*cfg)); + + if (ctgMgmt.cfg.maxDBCacheNum == 0) { + ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; + } + + if (ctgMgmt.cfg.maxTblCacheNum == 0) { + ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; + } } else { ctgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER; ctgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TABLEMETA_NUMBER; } - if (CTG_CACHE_ENABLED()) { - ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); - if (NULL == ctgMgmt.pCluster) { - CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); - } + ctgMgmt.pCluster = taosHashInit(CTG_DEFAULT_CACHE_CLUSTER_NUMBER, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK); + if (NULL == ctgMgmt.pCluster) { + CTG_ERR_LRET(TSDB_CODE_CTG_INTERNAL_ERROR, "init %d cluster cache failed", CTG_DEFAULT_CACHE_CLUSTER_NUMBER); } return TSDB_CODE_SUCCESS; @@ -449,13 +490,19 @@ int32_t catalogGetDBVgroupVersion(struct SCatalog* pCatalog, const char* dbName, return TSDB_CODE_SUCCESS; } -int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { +int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDBVgroupInfo* dbInfo) { if (NULL == pCatalog || NULL == dbName || NULL == dbInfo) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } if (dbInfo->vgVersion < 0) { if (pCatalog->dbCache.cache) { + SDBVgroupInfo *oldInfo = taosHashGet(pCatalog->dbCache.cache, dbName, strlen(dbName)); + if (oldInfo && oldInfo->vgInfo) { + taosHashCleanup(oldInfo->vgInfo); + oldInfo->vgInfo = NULL; + } + taosHashRemove(pCatalog->dbCache.cache, dbName, strlen(dbName)); } @@ -485,42 +532,6 @@ int32_t catalogUpdateDBVgroupCache(struct SCatalog* pCatalog, const char* dbName return TSDB_CODE_SUCCESS; } - - - -int32_t catalogGetDBVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* dbName, int32_t forceUpdate, SDBVgroupInfo* dbInfo) { - if (NULL == pCatalog || NULL == dbName || NULL == pRpc || NULL == pMgmtEps) { - CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); - } - - int32_t exist = 0; - - if (0 == forceUpdate) { - CTG_ERR_RET(ctgGetDBVgroupFromCache(pCatalog, dbName, dbInfo, &exist)); - - if (exist) { - return TSDB_CODE_SUCCESS; - } - } - - SUseDbOutput DbOut = {0}; - SBuildUseDBInput input = {0}; - - strncpy(input.db, dbName, sizeof(input.db)); - input.db[sizeof(input.db) - 1] = 0; - input.vgVersion = CTG_DEFAULT_INVALID_VERSION; - - CTG_ERR_RET(ctgGetDBVgroupFromMnode(pCatalog, pRpc, pMgmtEps, &input, &DbOut)); - - CTG_ERR_RET(catalogUpdateDBVgroupCache(pCatalog, dbName, &DbOut.dbVgroup)); - - if (dbInfo) { - *dbInfo = DbOut.dbVgroup; - } - - return TSDB_CODE_SUCCESS; -} - int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { return ctgGetTableMetaImpl(pCatalog, pTransporter, pMgmtEps, pDBName, pTableName, false, pTableMeta); } @@ -531,6 +542,7 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe } SVgroupInfo vgroupInfo = {0}; + int32_t code = 0; CTG_ERR_RET(catalogGetTableHashVgroup(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &vgroupInfo)); @@ -540,11 +552,13 @@ int32_t catalogRenewTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSe CTG_ERR_RET(ctgGetTableMetaFromMnode(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &output)); - CTG_ERR_RET(ctgUpdateTableMetaCache(pCatalog, &output)); + CTG_ERR_JRET(ctgUpdateTableMetaCache(pCatalog, &output)); + +_return: tfree(output.tbMeta); - return TSDB_CODE_SUCCESS; + CTG_RET(code); } int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, STableMeta** pTableMeta) { @@ -563,7 +577,7 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S CTG_ERR_JRET(catalogGetTableMeta(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, &tbMeta)); - CTG_ERR_JRET(catalogGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); + CTG_ERR_JRET(ctgGetDBVgroup(pCatalog, pRpc, pMgmtEps, pDBName, false, &dbVgroup)); if (tbMeta->tableType == TSDB_SUPER_TABLE) { CTG_ERR_JRET(ctgGetVgInfoFromDB(pCatalog, pRpc, pMgmtEps, &dbVgroup, pVgroupList)); @@ -594,6 +608,7 @@ _return: tfree(tbMeta); taosArrayDestroy(*pVgroupList); + *pVgroupList = NULL; CTG_RET(code); } @@ -604,7 +619,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog *pCatalog, void *pTransporter, int32_t code = 0; int32_t vgId = 0; - CTG_ERR_RET(catalogGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo)); + CTG_ERR_RET(ctgGetDBVgroup(pCatalog, pTransporter, pMgmtEps, pDBName, false, &dbInfo)); if (dbInfo.vgVersion < 0 || NULL == dbInfo.vgInfo) { ctgError("db[%s] vgroup cache invalid, vgroup version:%d, vgInfo:%p", pDBName, dbInfo.vgVersion, dbInfo.vgInfo); @@ -627,12 +642,15 @@ int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* p if (pReq->pTableName) { char dbName[TSDB_DB_FNAME_LEN]; int32_t tbNum = (int32_t)taosArrayGetSize(pReq->pTableName); - if (tbNum > 0) { - pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); - if (NULL == pRsp->pTableMeta) { - ctgError("taosArrayInit num[%d] failed", tbNum); - CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); - } + if (tbNum <= 0) { + ctgError("empty table name list"); + CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); + } + + pRsp->pTableMeta = taosArrayInit(tbNum, POINTER_BYTES); + if (NULL == pRsp->pTableMeta) { + ctgError("taosArrayInit num[%d] failed", tbNum); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } for (int32_t i = 0; i < tbNum; ++i) { @@ -663,6 +681,7 @@ _return: } taosArrayDestroy(pRsp->pTableMeta); + pRsp->pTableMeta = NULL; } CTG_RET(code); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4cde24e38c..7bd2205e43 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -791,9 +791,29 @@ void schDropJobAllTasks(SSchJob *job) { } } +uint64_t schGenSchId(void) { + uint64_t sId = 0; + + // TODO + + qDebug("Gen sId:0x%"PRIx64, sId); + + return sId; +} + + int32_t schedulerInit(SSchedulerCfg *cfg) { + if (schMgmt.jobs) { + qError("scheduler already init"); + return TSDB_CODE_QRY_INVALID_INPUT; + } + if (cfg) { schMgmt.cfg = *cfg; + + if (schMgmt.cfg.maxJobNum <= 0) { + schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; + } } else { schMgmt.cfg.maxJobNum = SCHEDULE_DEFAULT_JOB_NUMBER; } @@ -803,18 +823,14 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { SCH_ERR_LRET(TSDB_CODE_QRY_OUT_OF_MEMORY, "init %d schduler jobs failed", schMgmt.cfg.maxJobNum); } - schMgmt.sId = 1; //TODO GENERATE A UUID + schMgmt.sId = schGenSchId(); return TSDB_CODE_SUCCESS; } int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { - if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { - SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); - } - - if (taosArrayGetSize(qnodeList) <= 0) { + if (qnodeList && taosArrayGetSize(qnodeList) <= 0) { qInfo("qnodeList is empty"); } @@ -882,6 +898,10 @@ _return: } int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) { + if (NULL == transport || /* NULL == qnodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == numOfRows) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + *numOfRows = 0; SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); @@ -894,6 +914,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + if (NULL == transport || NULL == qnodeList ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false); }