feature/scheduler
This commit is contained in:
parent
f17d4fb94e
commit
888aa70b88
|
@ -453,6 +453,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_CTG_SYS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2404)
|
||||
#define TSDB_CODE_CTG_DB_DROPPED TAOS_DEF_ERROR_CODE(0, 0x2405)
|
||||
#define TSDB_CODE_CTG_OUT_OF_SERVICE TAOS_DEF_ERROR_CODE(0, 0x2406)
|
||||
#define TSDB_CODE_CTG_VG_META_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x2407)
|
||||
|
||||
//scheduler
|
||||
#define TSDB_CODE_SCH_STATUS_ERROR TAOS_DEF_ERROR_CODE(0, 0x2501)
|
||||
|
|
|
@ -484,6 +484,31 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgPushUpdateTblMsgInQueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq) {
|
||||
int32_t code = 0;
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
|
||||
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
msg->pCtg = pCtg;
|
||||
msg->output = output;
|
||||
|
||||
action.data = msg;
|
||||
|
||||
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
tfree(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgAcquireVgInfo(SCatalog *pCtg, SCtgDBCache *dbCache, bool *inCache) {
|
||||
CTG_LOCK(CTG_READ, &dbCache->vgLock);
|
||||
|
@ -1367,8 +1392,6 @@ int32_t ctgRemoveDB(SCatalog* pCtg, SCtgDBCache *dbCache, const char* dbFName) {
|
|||
|
||||
ctgFreeDbCache(dbCache);
|
||||
|
||||
ctgInfo("db removed from cache, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
|
||||
|
||||
CTG_ERR_RET(ctgMetaRentRemove(&pCtg->dbRent, dbCache->dbId, ctgDbVgVersionSortCompare, ctgDbVgVersionSearchCompare));
|
||||
|
||||
ctgDebug("db removed from rent, dbFName:%s, dbId:%"PRIx64, dbFName, dbCache->dbId);
|
||||
|
@ -1431,7 +1454,8 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD
|
|||
SDBVgInfo* dbInfo = *pDbInfo;
|
||||
|
||||
if (NULL == dbInfo->vgHash || dbInfo->vgVersion < 0 || taosHashGetSize(dbInfo->vgHash) <= 0) {
|
||||
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d", dbFName, dbInfo->vgHash, dbInfo->vgVersion);
|
||||
ctgError("invalid db vgInfo, dbFName:%s, vgHash:%p, vgVersion:%d, vgHashSize:%d",
|
||||
dbFName, dbInfo->vgHash, dbInfo->vgVersion, taosHashGetSize(dbInfo->vgHash));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
|
@ -1655,21 +1679,21 @@ _return:
|
|||
int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName) {
|
||||
bool inCache = false;
|
||||
int32_t code = 0;
|
||||
SCtgDBCache** dbCache = NULL;
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache, &inCache));
|
||||
|
||||
SUseDbOutput DbOut = {0};
|
||||
SBuildUseDBInput input = {0};
|
||||
tstrncpy(input.db, dbFName, tListLen(input.db));
|
||||
|
||||
if (inCache) {
|
||||
input.dbId = (*dbCache)->dbId;
|
||||
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
|
||||
input.numOfTable = (*dbCache)->vgInfo->numOfTable;
|
||||
input.dbId = dbCache->dbId;
|
||||
input.vgVersion = dbCache->vgInfo->vgVersion;
|
||||
input.numOfTable = dbCache->vgInfo->numOfTable;
|
||||
|
||||
ctgReleaseVgInfo(*dbCache);
|
||||
ctgReleaseDBCache(pCtg, *dbCache);
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
} else {
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
}
|
||||
|
@ -1717,7 +1741,7 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
|
|||
|
||||
|
||||
|
||||
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput) {
|
||||
int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, int32_t flag, STableMetaOutput **pOutput, bool syncReq) {
|
||||
if (NULL == pCtg || NULL == pTrans || NULL == pMgmtEps || NULL == pTableName) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
@ -1729,7 +1753,6 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
|||
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pTrans, pMgmtEps, pTableName, &vgroupInfo));
|
||||
}
|
||||
|
||||
SCtgUpdateTblMsg *msg = NULL;
|
||||
STableMetaOutput moutput = {0};
|
||||
STableMetaOutput *output = calloc(1, sizeof(STableMetaOutput));
|
||||
if (NULL == output) {
|
||||
|
@ -1801,19 +1824,7 @@ int32_t ctgRefreshTblMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps,
|
|||
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
|
||||
}
|
||||
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
|
||||
msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
msg->pCtg = pCtg;
|
||||
msg->output = output;
|
||||
|
||||
action.data = msg;
|
||||
|
||||
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
|
||||
CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output, syncReq));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
@ -1821,7 +1832,6 @@ _return:
|
|||
|
||||
tfree(output->tbMeta);
|
||||
tfree(output);
|
||||
tfree(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -1862,7 +1872,7 @@ int32_t ctgGetTableMeta(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, cons
|
|||
|
||||
|
||||
while (true) {
|
||||
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output));
|
||||
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, flag, &output, false));
|
||||
|
||||
if (CTG_IS_META_TABLE(output->metaType)) {
|
||||
*pTableMeta = output->tbMeta;
|
||||
|
@ -2155,6 +2165,82 @@ int32_t ctgStartUpdateThread() {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const SName* pTableName, SArray** pVgList) {
|
||||
STableMeta *tbMeta = NULL;
|
||||
int32_t code = 0;
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
SArray *vgList = NULL;
|
||||
SDBVgInfo *vgInfo = NULL;
|
||||
|
||||
*pVgList = NULL;
|
||||
|
||||
CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB));
|
||||
|
||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
SHashObj *vgHash = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));
|
||||
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgInfo->vgHash;
|
||||
} else {
|
||||
vgHash = vgInfo->vgHash;
|
||||
}
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
|
||||
} else {
|
||||
// USE HASH METHOD INSTEAD OF VGID IN TBMETA
|
||||
ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
|
||||
#if 0
|
||||
int32_t vgId = tbMeta->vgId;
|
||||
if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
|
||||
ctgWarn("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_VG_META_MISMATCH);
|
||||
}
|
||||
|
||||
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
|
||||
if (NULL == vgList) {
|
||||
ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
|
||||
ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
*pVgList = vgList;
|
||||
vgList = NULL;
|
||||
#endif
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
tfree(tbMeta);
|
||||
|
||||
if (vgInfo) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
tfree(vgInfo);
|
||||
}
|
||||
|
||||
if (vgList) {
|
||||
taosArrayDestroy(vgList);
|
||||
vgList = NULL;
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
if (gCtgMgmt.pCluster) {
|
||||
|
@ -2502,19 +2588,7 @@ int32_t catalogUpdateSTableMeta(SCatalog* pCtg, STableMetaRsp *rspMsg) {
|
|||
|
||||
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, true, &output->tbMeta));
|
||||
|
||||
SCtgMetaAction action= {.act = CTG_ACT_UPDATE_TBL};
|
||||
SCtgUpdateTblMsg *msg = malloc(sizeof(SCtgUpdateTblMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTblMsg));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
msg->pCtg = pCtg;
|
||||
msg->output = output;
|
||||
|
||||
action.data = msg;
|
||||
|
||||
CTG_ERR_JRET(ctgPushAction(pCtg, &action));
|
||||
CTG_ERR_JRET(ctgPushUpdateTblMsgInQueue(pCtg, output));
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
||||
|
@ -2522,7 +2596,6 @@ _return:
|
|||
|
||||
tfree(output->tbMeta);
|
||||
tfree(output);
|
||||
tfree(msg);
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
@ -2544,7 +2617,7 @@ int32_t catalogRefreshTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
|
|||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL));
|
||||
CTG_API_LEAVE(ctgRefreshTblMeta(pCtg, pTrans, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(isSTable), NULL, false));
|
||||
}
|
||||
|
||||
int32_t catalogRefreshGetTableMeta(SCatalog* pCtg, void *pTrans, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta, int32_t isSTable) {
|
||||
|
@ -2565,82 +2638,27 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgm
|
|||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
STableMeta *tbMeta = NULL;
|
||||
int32_t code = 0;
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
SCtgDBCache* dbCache = NULL;
|
||||
SArray *vgList = NULL;
|
||||
SDBVgInfo *vgInfo = NULL;
|
||||
|
||||
*pVgList = NULL;
|
||||
while (true) {
|
||||
code = ctgGetTableDistVgInfo(pCtg, pRpc, pMgmtEps, pTableName, pVgList);
|
||||
if (code) {
|
||||
if (TSDB_CODE_CTG_VG_META_MISMATCH == code) {
|
||||
CTG_ERR_JRET(ctgRefreshTblMeta(pCtg, pRpc, pMgmtEps, pTableName, CTG_FLAG_FORCE_UPDATE | CTG_FLAG_MAKE_STB(CTG_FLAG_UNKNOWN_STB), NULL, true));
|
||||
|
||||
CTG_ERR_JRET(ctgGetTableMeta(pCtg, pRpc, pMgmtEps, pTableName, &tbMeta, CTG_FLAG_UNKNOWN_STB));
|
||||
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
CTG_ERR_JRET(ctgRefreshDBVgInfo(pCtg, pRpc, pMgmtEps, dbFName));
|
||||
|
||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
SHashObj *vgHash = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pRpc, pMgmtEps, db, false, &dbCache, &vgInfo));
|
||||
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgInfo->vgHash;
|
||||
} else {
|
||||
vgHash = vgInfo->vgHash;
|
||||
}
|
||||
|
||||
/* TODO REMOEV THIS ....
|
||||
if (0 == tbMeta->vgId) {
|
||||
SVgroupInfo vgroup = {0};
|
||||
|
||||
catalogGetTableHashVgroup(pCtg, pRpc, pMgmtEps, pTableName, &vgroup);
|
||||
|
||||
tbMeta->vgId = vgroup.vgId;
|
||||
}
|
||||
// TODO REMOVE THIS ....*/
|
||||
|
||||
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
|
||||
} else {
|
||||
int32_t vgId = tbMeta->vgId;
|
||||
if (taosHashGetDup(vgHash, &vgId, sizeof(vgId), &vgroupInfo) != 0) {
|
||||
ctgError("table's vgId not found in vgroup list, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
vgList = taosArrayInit(1, sizeof(SVgroupInfo));
|
||||
if (NULL == vgList) {
|
||||
ctgError("taosArrayInit %d failed", (int32_t)sizeof(SVgroupInfo));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
if (NULL == taosArrayPush(vgList, &vgroupInfo)) {
|
||||
ctgError("taosArrayPush vgroupInfo to array failed, vgId:%d, tbName:%s", vgId, tNameGetTableName(pTableName));
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
*pVgList = vgList;
|
||||
vgList = NULL;
|
||||
break;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
tfree(tbMeta);
|
||||
|
||||
if (vgInfo) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
tfree(vgInfo);
|
||||
}
|
||||
|
||||
if (vgList) {
|
||||
taosArrayDestroy(vgList);
|
||||
vgList = NULL;
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -415,7 +415,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limi
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_INVALID_VER, "WAL use invalid version")
|
||||
|
||||
// tfs
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount")
|
||||
|
@ -433,6 +433,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_CTG_MEM_ERROR, "catalog memory error"
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_SYS_ERROR, "catalog system error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_DB_DROPPED, "Database is dropped")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_OUT_OF_SERVICE, "catalog is out of service")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_CTG_VG_META_MISMATCH, "table meta and vgroup mismatch")
|
||||
|
||||
//scheduler
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SCH_STATUS_ERROR, "scheduler status error")
|
||||
|
|
Loading…
Reference in New Issue