feature/scheduler
This commit is contained in:
parent
194722ee24
commit
71fdf76b8a
|
@ -140,6 +140,8 @@ typedef enum _mgmt_table {
|
||||||
|
|
||||||
#define TSDB_KILL_MSG_LEN 30
|
#define TSDB_KILL_MSG_LEN 30
|
||||||
|
|
||||||
|
#define TSDB_TABLE_NUM_UNIT 100000
|
||||||
|
|
||||||
#define TSDB_VN_READ_ACCCESS ((char)0x1)
|
#define TSDB_VN_READ_ACCCESS ((char)0x1)
|
||||||
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
#define TSDB_VN_WRITE_ACCCESS ((char)0x2)
|
||||||
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
|
#define TSDB_VN_ALL_ACCCESS (TSDB_VN_READ_ACCCESS | TSDB_VN_WRITE_ACCCESS)
|
||||||
|
@ -566,6 +568,7 @@ typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
int64_t dbId;
|
int64_t dbId;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
|
int32_t numOfTable;
|
||||||
} SUseDbReq;
|
} SUseDbReq;
|
||||||
|
|
||||||
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
|
||||||
|
@ -791,8 +794,10 @@ typedef struct SVgroupInfo {
|
||||||
uint32_t hashBegin;
|
uint32_t hashBegin;
|
||||||
uint32_t hashEnd;
|
uint32_t hashEnd;
|
||||||
SEpSet epset;
|
SEpSet epset;
|
||||||
|
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||||
} SVgroupInfo;
|
} SVgroupInfo;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numOfVgroups;
|
int32_t numOfVgroups;
|
||||||
SVgroupInfo vgroups[];
|
SVgroupInfo vgroups[];
|
||||||
|
|
|
@ -74,9 +74,9 @@ typedef struct SDbVgVersion {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
int64_t dbId;
|
int64_t dbId;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
|
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
|
||||||
} SDbVgVersion;
|
} SDbVgVersion;
|
||||||
|
|
||||||
|
|
||||||
int32_t catalogInit(SCatalogCfg *cfg);
|
int32_t catalogInit(SCatalogCfg *cfg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -1423,6 +1423,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
|
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->numOfTable) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
|
@ -1438,6 +1439,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
|
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->numOfTable) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
tCoderClear(&decoder);
|
tCoderClear(&decoder);
|
||||||
|
@ -1482,6 +1484,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) {
|
||||||
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
|
if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1;
|
||||||
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
|
if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1;
|
||||||
if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1;
|
if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pVgInfo->numOfTable) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1542,6 +1545,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) {
|
||||||
if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1;
|
if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1;
|
||||||
if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1;
|
if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1;
|
||||||
if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1;
|
if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &vgInfo.numOfTable) < 0) return -1;
|
||||||
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
|
taosArrayPush(pRsp->pVgroupInfos, &vgInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -885,6 +885,29 @@ DROP_DB_OVER:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndGetDBTableNum(SDbObj *pDb, SMnode *pMnode, int32_t *num) {
|
||||||
|
int32_t vindex = 0;
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
|
||||||
|
void *pIter = NULL;
|
||||||
|
while (vindex < pDb->cfg.numOfVgroups) {
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
|
if (pVgroup->dbUid == pDb->uid) {
|
||||||
|
*num += pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
|
||||||
|
|
||||||
|
vindex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
int32_t vindex = 0;
|
int32_t vindex = 0;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -900,6 +923,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
|
||||||
vgInfo.vgId = pVgroup->vgId;
|
vgInfo.vgId = pVgroup->vgId;
|
||||||
vgInfo.hashBegin = pVgroup->hashBegin;
|
vgInfo.hashBegin = pVgroup->hashBegin;
|
||||||
vgInfo.hashEnd = pVgroup->hashEnd;
|
vgInfo.hashEnd = pVgroup->hashEnd;
|
||||||
|
vgInfo.numOfTable = pVgroup->numOfTables / TSDB_TABLE_NUM_UNIT;
|
||||||
vgInfo.epset.numOfEps = pVgroup->replica;
|
vgInfo.epset.numOfEps = pVgroup->replica;
|
||||||
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
|
for (int32_t gid = 0; gid < pVgroup->replica; ++gid) {
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[gid];
|
||||||
|
@ -967,7 +991,10 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||||
goto USE_DB_OVER;
|
goto USE_DB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) {
|
int32_t numOfTable = 0;
|
||||||
|
mndGetDBTableNum(pDb, pMnode, &numOfTable);
|
||||||
|
|
||||||
|
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid || numOfTable != usedbReq.numOfTable) {
|
||||||
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1017,6 +1044,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
SDbVgVersion *pDbVgVersion = &pDbs[i];
|
SDbVgVersion *pDbVgVersion = &pDbs[i];
|
||||||
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
|
pDbVgVersion->dbId = htobe64(pDbVgVersion->dbId);
|
||||||
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
|
pDbVgVersion->vgVersion = htonl(pDbVgVersion->vgVersion);
|
||||||
|
pDbVgVersion->numOfTable = htonl(pDbVgVersion->numOfTable);
|
||||||
|
|
||||||
SUseDbRsp usedbRsp = {0};
|
SUseDbRsp usedbRsp = {0};
|
||||||
|
|
||||||
|
@ -1027,28 +1055,34 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
||||||
usedbRsp.uid = pDbVgVersion->dbId;
|
usedbRsp.uid = pDbVgVersion->dbId;
|
||||||
usedbRsp.vgVersion = -1;
|
usedbRsp.vgVersion = -1;
|
||||||
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
|
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
|
||||||
} else if (pDbVgVersion->vgVersion >= pDb->vgVersion) {
|
continue;
|
||||||
mDebug("db:%s, version not changed", pDbVgVersion->dbFName);
|
}
|
||||||
|
|
||||||
|
int32_t numOfTable = 0;
|
||||||
|
mndGetDBTableNum(pDb, pMnode, &numOfTable);
|
||||||
|
|
||||||
|
if (pDbVgVersion->vgVersion >= pDb->vgVersion && numOfTable == pDbVgVersion->numOfTable) {
|
||||||
|
mDebug("db:%s, version & numOfTable not changed", pDbVgVersion->dbFName);
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
continue;
|
continue;
|
||||||
} else {
|
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
|
||||||
if (usedbRsp.pVgroupInfos == NULL) {
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
mError("db:%s, failed to malloc usedb response", pDb->name);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
|
||||||
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
|
||||||
usedbRsp.uid = pDb->uid;
|
|
||||||
usedbRsp.vgVersion = pDb->vgVersion;
|
|
||||||
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
|
|
||||||
usedbRsp.hashMethod = pDb->hashMethod;
|
|
||||||
|
|
||||||
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
|
|
||||||
mndReleaseDb(pMnode, pDb);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
|
if (usedbRsp.pVgroupInfos == NULL) {
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
mError("db:%s, failed to malloc usedb response", pDb->name);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
||||||
|
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
usedbRsp.uid = pDb->uid;
|
||||||
|
usedbRsp.vgVersion = pDb->vgVersion;
|
||||||
|
usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||||
|
usedbRsp.hashMethod = pDb->hashMethod;
|
||||||
|
|
||||||
|
taosArrayPush(batchUseRsp.pArray, &usedbRsp);
|
||||||
|
mndReleaseDb(pMnode, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
|
int32_t rspLen = tSerializeSUseDbBatchRsp(NULL, 0, &batchUseRsp);
|
||||||
|
|
|
@ -247,7 +247,7 @@ int32_t queryProcessTableMetaRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
|
||||||
PROCESS_META_OVER:
|
PROCESS_META_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
qError("failed to process table meta rsp since %s", terrstr());
|
qError("failed to process table meta rsp since %s", tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
tFreeSTableMetaRsp(&metaRsp);
|
tFreeSTableMetaRsp(&metaRsp);
|
||||||
|
|
Loading…
Reference in New Issue