feature/scheduler
This commit is contained in:
parent
85a1bd1cc9
commit
cec51ef719
|
@ -112,7 +112,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCatalog, const char* dbName, uint64_t d
|
|||
|
||||
int32_t catalogRemoveDB(SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
||||
|
||||
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName);
|
||||
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName);
|
||||
|
||||
int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* stbName, uint64_t suid);
|
||||
|
||||
|
|
|
@ -406,9 +406,9 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid) {
|
||||
int32_t ctgPushRmStbMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB};
|
||||
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_STB, .syncReq = syncReq};
|
||||
SCtgRemoveStbMsg *msg = malloc(sizeof(SCtgRemoveStbMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveStbMsg));
|
||||
|
@ -435,9 +435,9 @@ _return:
|
|||
|
||||
|
||||
|
||||
int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName) {
|
||||
int32_t ctgPushRmTblMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *tbName, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL};
|
||||
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_TBL, .syncReq = syncReq};
|
||||
SCtgRemoveTblMsg *msg = malloc(sizeof(SCtgRemoveTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveTblMsg));
|
||||
|
@ -496,7 +496,7 @@ _return:
|
|||
|
||||
int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL, .syncReq = syncReq};
|
||||
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
|
||||
|
@ -1843,6 +1843,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
|||
|
||||
if (CTG_IS_META_NULL(output->metaType)) {
|
||||
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(pTableName));
|
||||
catalogRemoveTableMeta(pCtg, pTableName);
|
||||
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
|
||||
}
|
||||
|
||||
|
@ -1951,9 +1952,9 @@ _return:
|
|||
}
|
||||
|
||||
if (TSDB_SUPER_TABLE == tbType) {
|
||||
ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid);
|
||||
ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, suid, false);
|
||||
} else {
|
||||
ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname);
|
||||
ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, false);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2534,7 +2535,7 @@ int32_t catalogUpdateVgEpSet(SCatalog* pCtg, const char* dbFName, int32_t vgId,
|
|||
|
||||
}
|
||||
|
||||
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
|
||||
int32_t catalogRemoveTableMeta(SCatalog* pCtg, const SName* pTableName) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
int32_t code = 0;
|
||||
|
@ -2561,9 +2562,9 @@ int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
|
|||
tNameGetFullDbName(pTableName, dbFName);
|
||||
|
||||
if (TSDB_SUPER_TABLE == tblMeta->tableType) {
|
||||
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid));
|
||||
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, tblMeta->suid, true));
|
||||
} else {
|
||||
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname));
|
||||
CTG_ERR_JRET(ctgPushRmTblMsgInQueue(pCtg, dbFName, dbId, pTableName->tname, true));
|
||||
}
|
||||
|
||||
|
||||
|
@ -2588,7 +2589,7 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
|
|||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid));
|
||||
CTG_ERR_JRET(ctgPushRmStbMsgInQueue(pCtg, dbFName, dbId, stbName, suid, true));
|
||||
|
||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
|
||||
|
|
|
@ -84,12 +84,18 @@ static SName* toName(int32_t acctId, const char* pDbName, const char* pTableName
|
|||
return pName;
|
||||
}
|
||||
|
||||
static int32_t collectUseDatabase(const char* pFullDbName, SHashObj* pDbs) {
|
||||
static int32_t collectUseDatabaseImpl(const char* pFullDbName, SHashObj* pDbs) {
|
||||
SFullDatabaseName name = {0};
|
||||
strcpy(name.fullDbName, pFullDbName);
|
||||
return taosHashPut(pDbs, pFullDbName, strlen(pFullDbName), &name, sizeof(SFullDatabaseName));
|
||||
}
|
||||
|
||||
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pName, dbFName);
|
||||
return collectUseDatabaseImpl(dbFName, pDbs);
|
||||
}
|
||||
|
||||
static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
|
||||
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pName, fullName);
|
||||
|
@ -98,7 +104,10 @@ static int32_t collectUseTable(const SName* pName, SHashObj* pDbs) {
|
|||
|
||||
static int32_t getTableMetaImpl(STranslateContext* pCxt, const SName* pName, STableMeta** pMeta) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = collectUseTable(pName, pCxt->pTables);
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetTableMeta(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pMeta);
|
||||
}
|
||||
|
@ -117,7 +126,10 @@ static int32_t getTableMeta(STranslateContext* pCxt, const char* pDbName, const
|
|||
|
||||
static int32_t getTableDistVgInfo(STranslateContext* pCxt, const SName* pName, SArray** pVgInfo) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = collectUseTable(pName, pCxt->pTables);
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetTableDistVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pVgInfo);
|
||||
}
|
||||
|
@ -131,7 +143,7 @@ static int32_t getDBVgInfoImpl(STranslateContext* pCxt, const SName* pName, SArr
|
|||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
char fullDbName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pName, fullDbName);
|
||||
int32_t code = collectUseDatabase(fullDbName, pCxt->pDbs);
|
||||
int32_t code = collectUseDatabaseImpl(fullDbName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetDBVgInfo(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, fullDbName, pVgInfo);
|
||||
}
|
||||
|
@ -151,7 +163,10 @@ static int32_t getDBVgInfo(STranslateContext* pCxt, const char* pDbName, SArray*
|
|||
|
||||
static int32_t getTableHashVgroupImpl(STranslateContext* pCxt, const SName* pName, SVgroupInfo* pInfo) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = collectUseTable(pName, pCxt->pTables);
|
||||
int32_t code = collectUseDatabase(pName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = collectUseTable(pName, pCxt->pTables);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetTableHashVgroup(pParCxt->pCatalog, pParCxt->pTransporter, &pParCxt->mgmtEpSet, pName, pInfo);
|
||||
}
|
||||
|
@ -170,7 +185,7 @@ static int32_t getTableHashVgroup(STranslateContext* pCxt, const char* pDbName,
|
|||
|
||||
static int32_t getDBVgVersion(STranslateContext* pCxt, const char* pDbFName, int32_t* pVersion, int64_t* pDbId, int32_t* pTableNum) {
|
||||
SParseContext* pParCxt = pCxt->pParseCxt;
|
||||
int32_t code = collectUseDatabase(pDbFName, pCxt->pDbs);
|
||||
int32_t code = collectUseDatabaseImpl(pDbFName, pCxt->pDbs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = catalogGetDBVgVersion(pParCxt->pCatalog, pDbFName, pVersion, pDbId, pTableNum);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue