From 888aa70b88e5eb870c1a8e22c21779b259503ba7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 11 Mar 2022 10:53:23 +0800 Subject: [PATCH] feature/scheduler --- include/util/taoserror.h | 1 + source/libs/catalog/src/catalog.c | 238 ++++++++++++++++-------------- source/util/src/terror.c | 3 +- 3 files changed, 131 insertions(+), 111 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 0e80ebdb75..d7f8a6d7ab 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -453,6 +453,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404) #define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405) #define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406) +#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407) //scheduler #define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 81e68e5433..4d8aac027d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -484,6 +484,31 @@ _return: CTG_RET(code); } +int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) { + int32_t code = 0; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; + SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + + msg->pCtg = pCtg; + msg->output = output; + + action.data = msg; + + CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + + return TSDB_CODE_SUCCESS; + +_return: + + tfree(msg); + + CTG_RET(code); +} + int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) { CTG_LOCK(CTG_READ, &dbCache->vgLock); @@ -1367,8 +1392,6 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) { ctgFreeDbCache(dbCache); - ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); - CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare)); ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId); @@ -1431,7 +1454,8 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD SDBVgInfo* dbInfo = *pDbInfo; if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) { - ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion); + ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d", + dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash)); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -1655,21 +1679,21 @@ _return: int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) { bool inCache = false; int32_t code = 0; - SCtgDBCache** dbCache = NULL; + SCtgDBCache* dbCache = NULL; - CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache)); + CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache, &inCache)); SUseDbOutput DbOut = {0}; SBuildUseDBInput input = {0}; tstrncpy(input.db, dbFName, tListLen(input.db)); if (inCache) { - input.dbId = (*dbCache)->dbId; - input.vgVersion = (*dbCache)->vgInfo->vgVersion; - input.numOfTable = (*dbCache)->vgInfo->numOfTable; + input.dbId = dbCache->dbId; + input.vgVersion = dbCache->vgInfo->vgVersion; + input.numOfTable = dbCache->vgInfo->numOfTable; - ctgReleaseVgInfo(*dbCache); - ctgReleaseDBCache(pCtg, *dbCache); + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); } else { input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } @@ -1717,7 +1741,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput) -int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput) { +int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput, bool syncReq) { if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -1729,7 +1753,6 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo)); } - SCtgUpdateTblMsg *msg = NULL; STableMetaOutput moutput = {0}; STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput)); if (NULL == output) { @@ -1801,19 +1824,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput)); } - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; - msg = malloc(sizeof(SCtgUpdateTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - msg->output = output; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, syncReq)); return TSDB_CODE_SUCCESS; @@ -1821,7 +1832,6 @@ _return: tfree(output->tbMeta); tfree(output); - tfree(msg); CTG_RET(code); } @@ -1862,7 +1872,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons while (true) { - CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output)); + CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false)); if (CTG_IS_META_TABLE(output->metaType)) { *pTableMeta = output->tbMeta; @@ -2155,6 +2165,82 @@ int32_t ctgStartUpdateThread() { return TSDB_CODE_SUCCESS; } +int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) { + STableMeta *tbMeta = NULL; + int32_t code = 0; + SVgroupInfo vgroupInfo = {0}; + SCtgDBCache* dbCache = NULL; + SArray *vgList = NULL; + SDBVgInfo *vgInfo = NULL; + + *pVgList = NULL; + + CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB)); + + char db[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, db); + + SHashObj *vgHash = NULL; + CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); + + if (dbCache) { + vgHash = dbCache->vgInfo->vgHash; + } else { + vgHash = vgInfo->vgHash; + } + + if (tbMeta->tableType == TSDB_SUPER_TABLE) { + CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); + } else { + // USE HASH METHOD INSTEAD OF VGID IN TBMETA + ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType); + CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT); + +#if 0 + int32_t vgId = tbMeta->vgId; + if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { + ctgWarn("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); + CTG_ERR_JRET(TSDB_CODE_CTG_VG_META_MISMATCH); + } + + vgList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == vgList) { + ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL == taosArrayPush(vgList, &vgroupInfo)) { + ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + *pVgList = vgList; + vgList = NULL; +#endif + } + +_return: + + if (dbCache) { + ctgReleaseVgInfo(dbCache); + ctgReleaseDBCache(pCtg, dbCache); + } + + tfree(tbMeta); + + if (vgInfo) { + taosHashCleanup(vgInfo->vgHash); + tfree(vgInfo); + } + + if (vgList) { + taosArrayDestroy(vgList); + vgList = NULL; + } + + CTG_RET(code); +} + int32_t catalogInit(SCatalogCfg *cfg) { if (gCtgMgmt.pCluster) { @@ -2502,19 +2588,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) { CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta)); - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; - SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - msg->pCtg = pCtg; - msg->output = output; - - action.data = msg; - - CTG_ERR_JRET(ctgPushAction(pCtg, &action)); + CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output)); CTG_API_LEAVE(code); @@ -2522,7 +2596,6 @@ _return: tfree(output->tbMeta); tfree(output); - tfree(msg); CTG_API_LEAVE(code); } @@ -2544,7 +2617,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL)); + CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, false)); } int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) { @@ -2564,83 +2637,28 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm ctgError("no valid vgInfo for db, dbname:%s", pTableName->dbname); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); } - - STableMeta *tbMeta = NULL; + int32_t code = 0; - SVgroupInfo vgroupInfo = {0}; - SCtgDBCache* dbCache = NULL; - SArray *vgList = NULL; - SDBVgInfo *vgInfo = NULL; - *pVgList = NULL; - - CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB)); + while (true) { + code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList); + if (code) { + if (TSDB_CODE_CTG_VG_META_MISMATCH == code) { + CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true)); - char db[TSDB_DB_FNAME_LEN] = {0}; - tNameGetFullDbName(pTableName, db); - - SHashObj *vgHash = NULL; - CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo)); - - if (dbCache) { - vgHash = dbCache->vgInfo->vgHash; - } else { - vgHash = vgInfo->vgHash; - } - - /* TODO REMOEV THIS .... - if (0 == tbMeta->vgId) { - SVgroupInfo vgroup = {0}; - - catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup); - - tbMeta->vgId = vgroup.vgId; - } - // TODO REMOVE THIS ....*/ - - if (tbMeta->tableType == TSDB_SUPER_TABLE) { - CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList)); - } else { - int32_t vgId = tbMeta->vgId; - if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) { - ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pTableName, dbFName); + CTG_ERR_JRET(ctgRefreshDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName)); + + continue; + } } - vgList = taosArrayInit(1, sizeof(SVgroupInfo)); - if (NULL == vgList) { - ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo)); - CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); - } - - if (NULL == taosArrayPush(vgList, &vgroupInfo)) { - ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName)); - CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); - } - - *pVgList = vgList; - vgList = NULL; + break; } _return: - if (dbCache) { - ctgReleaseVgInfo(dbCache); - ctgReleaseDBCache(pCtg, dbCache); - } - - tfree(tbMeta); - - if (vgInfo) { - taosHashCleanup(vgInfo->vgHash); - tfree(vgInfo); - } - - if (vgList) { - taosArrayDestroy(vgList); - vgList = NULL; - } - CTG_API_LEAVE(code); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index fc9c36097c..a93a07648a 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -415,7 +415,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limi TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version") // tfs -TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount") TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount") @@ -433,6 +433,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error" TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped") TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service") +TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch") //scheduler TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")