enh: wait for the vgroups have leader then create db return
This commit is contained in:
parent
9cc08fbb30
commit
39fa900ce5
|
@ -28,6 +28,7 @@ SDbObj *mndAcquireDb(SMnode *pMnode, const char *db);
|
|||
void mndReleaseDb(SMnode *pMnode, SDbObj *pDb);
|
||||
int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, void **ppRsp, int32_t *pRspLen);
|
||||
int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUseDbReq *pReq);
|
||||
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb);
|
||||
|
||||
const char *mndGetDbStr(const char *src);
|
||||
|
||||
|
|
|
@ -162,8 +162,9 @@ typedef struct {
|
|||
int64_t lastExecTime;
|
||||
int32_t lastAction;
|
||||
int32_t lastErrorNo;
|
||||
tmsg_t lastMsgType;
|
||||
SEpSet lastEpset;
|
||||
tmsg_t lastMsgType;
|
||||
tmsg_t originRpcType;
|
||||
char dbname1[TSDB_TABLE_FNAME_LEN];
|
||||
char dbname2[TSDB_TABLE_FNAME_LEN];
|
||||
int32_t startFunc;
|
||||
|
|
|
@ -1495,7 +1495,7 @@ static const char *getCacheModelStr(int8_t cacheModel) {
|
|||
return "unknown";
|
||||
}
|
||||
|
||||
static bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb) {
|
||||
bool mndIsDbReady(SMnode *pMnode, SDbObj *pDb) {
|
||||
if (pDb->cfg.replications == 1) return true;
|
||||
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
|
|
@ -124,8 +124,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
SDB_SET_INT8(pRaw, dataPos, pTrans->exec, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, pTrans->oper, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
||||
SDB_SET_INT8(pRaw, dataPos, 0, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->originRpcType, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
|
@ -282,13 +281,12 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
SDB_GET_INT8(pRaw, dataPos, &exec, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &oper, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
|
||||
SDB_GET_INT8(pRaw, dataPos, &reserved, _OVER)
|
||||
pTrans->stage = stage;
|
||||
pTrans->policy = policy;
|
||||
pTrans->conflict = conflict;
|
||||
pTrans->exec = exec;
|
||||
pTrans->oper = oper;
|
||||
SDB_GET_INT16(pRaw, dataPos, &pTrans->originRpcType, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname1, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname2, TSDB_TABLE_FNAME_LEN, _OVER)
|
||||
|
@ -611,6 +609,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
|||
|
||||
if (pReq != NULL) {
|
||||
taosArrayPush(pTrans->pRpcArray, &pReq->info);
|
||||
pTrans->originRpcType = pReq->msgType;
|
||||
}
|
||||
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
|
||||
return pTrans;
|
||||
|
@ -910,6 +909,23 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
}
|
||||
|
||||
if (pTrans->originRpcType == TDMT_MND_CREATE_DB) {
|
||||
mDebug("trans:%d, origin msgtype:%s", pTrans->id, TMSG_INFO(pTrans->originRpcType));
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pTrans->dbname1);
|
||||
if (pDb != NULL) {
|
||||
for (int32_t j = 0; j < 12; j++) {
|
||||
bool ready = mndIsDbReady(pMnode, pDb);
|
||||
if (!ready) {
|
||||
mDebug("trans:%d, db:%s not ready yet, wait %d times", pTrans->id, pTrans->dbname1, j);
|
||||
taosMsleep(1000);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rspMsg);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue