From cec51ef719a41aca125f8a78f5f7b5e9f8bc0852 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 24 Mar 2022 16:38:44 +0800 Subject: [PATCH] feature/scheduler --- include/libs/catalog/catalog.h | 2 +- source/libs/catalog/src/catalog.c | 23 +++++++++++----------- source/libs/parser/src/parTranslater.c | 27 ++++++++++++++++++++------ 3 files changed, 34 insertions(+), 18 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index dd5f7fc104..9f0d4b11c2 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -112,7 +112,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId); -int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName); +int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName); int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 8f67482650..a3a52c8b6d 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -406,9 +406,9 @@ _return: } -int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) { +int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB}; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq}; SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg)); @@ -435,9 +435,9 @@ _return: -int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) { +int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL}; + SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq}; SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg)); @@ -496,7 +496,7 @@ _return: int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) { int32_t code = 0; - SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL}; + SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq}; SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg)); if (NULL == msg) { ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg)); @@ -1843,6 +1843,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, if (CTG_IS_META_NULL(output->metaType)) { ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName)); + catalogRemoveTableMeta(pCtg, pTableName); CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST); } @@ -1951,9 +1952,9 @@ _return: } if (TSDB_SUPER_TABLE == tbType) { - ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid); + ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid, false); } else { - ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname); + ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, false); } } @@ -2534,7 +2535,7 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId, } -int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { +int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) { CTG_API_ENTER(); int32_t code = 0; @@ -2561,9 +2562,9 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { tNameGetFullDbName(pTableName, dbFName); if (TSDB_SUPER_TABLE == tblMeta->tableType) { - CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid)); + CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid, true)); } else { - CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname)); + CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true)); } @@ -2588,7 +2589,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, CTG_API_LEAVE(TSDB_CODE_SUCCESS); } - CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid)); + CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid, true)); CTG_API_LEAVE(TSDB_CODE_SUCCESS); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 100fc3f107..19f7f47558 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -84,12 +84,18 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName return pName; } -static int32_t collectUseDatabase(const char* pFullDbName, SHashObj* pDbs) { +static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) { SFullDatabaseName name = {0}; strcpy(name.fullDbName, pFullDbName); return taosHashPut(pDbs, pFullDbName, strlen(pFullDbName), &name, sizeof(SFullDatabaseName)); } +static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) { + char dbFName[TSDB_DB_FNAME_LEN] = {0}; + tNameGetFullDbName(pName, dbFName); + return collectUseDatabaseImpl(dbFName, pDbs); +} + static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) { char fullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pName, fullName); @@ -98,7 +104,10 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) { static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) { SParseContext* pParCxt = pCxt->pParseCxt; - int32_t code = collectUseTable(pName, pCxt->pTables); + int32_t code = collectUseDatabase(pName, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(pName, pCxt->pTables); + } if (TSDB_CODE_SUCCESS == code) { code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta); } @@ -117,7 +126,10 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) { SParseContext* pParCxt = pCxt->pParseCxt; - int32_t code = collectUseTable(pName, pCxt->pTables); + int32_t code = collectUseDatabase(pName, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(pName, pCxt->pTables); + } if (TSDB_CODE_SUCCESS == code) { code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo); } @@ -131,7 +143,7 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr SParseContext* pParCxt = pCxt->pParseCxt; char fullDbName[TSDB_DB_FNAME_LEN]; tNameGetFullDbName(pName, fullDbName); - int32_t code = collectUseDatabase(fullDbName, pCxt->pDbs); + int32_t code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs); if (TSDB_CODE_SUCCESS == code) { code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo); } @@ -151,7 +163,10 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray* static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) { SParseContext* pParCxt = pCxt->pParseCxt; - int32_t code = collectUseTable(pName, pCxt->pTables); + int32_t code = collectUseDatabase(pName, pCxt->pDbs); + if (TSDB_CODE_SUCCESS == code) { + code = collectUseTable(pName, pCxt->pTables); + } if (TSDB_CODE_SUCCESS == code) { code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo); } @@ -170,7 +185,7 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName, static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) { SParseContext* pParCxt = pCxt->pParseCxt; - int32_t code = collectUseDatabase(pDbFName, pCxt->pDbs); + int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs); if (TSDB_CODE_SUCCESS == code) { code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum); }