fix: error in sync sem
This commit is contained in:
parent
9d93b75020
commit
c44ec05229
|
@ -81,6 +81,7 @@ typedef struct {
|
|||
bool standby;
|
||||
bool restored;
|
||||
int32_t errCode;
|
||||
int32_t transId;
|
||||
} SSyncMgmt;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
|||
int32_t mndInitSync(SMnode *pMnode);
|
||||
void mndCleanupSync(SMnode *pMnode);
|
||||
bool mndIsMaster(SMnode *pMnode);
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw);
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId);
|
||||
void mndSyncStart(SMnode *pMnode);
|
||||
void mndSyncStop(SMnode *pMnode);
|
||||
|
||||
|
|
|
@ -702,14 +702,17 @@ static int32_t mndProcessAlterMnodeReq(SRpcMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
mTrace("trans:-1, sync reconfig will be proposed");
|
||||
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
pMgmt->standby = 0;
|
||||
int32_t code = syncReconfig(pMgmt->sync, &cfg);
|
||||
if (code != 0) {
|
||||
mError("failed to alter mnode sync since %s", terrstr());
|
||||
mError("trans:-1, failed to propose sync reconfig since %s", terrstr());
|
||||
return code;
|
||||
} else {
|
||||
pMgmt->errCode = 0;
|
||||
pMgmt->transId = -1;
|
||||
tsem_wait(&pMgmt->syncSem);
|
||||
mInfo("alter mnode sync result:%s", tstrerror(pMgmt->errCode));
|
||||
terrno = pMgmt->errCode;
|
||||
|
|
|
@ -28,16 +28,26 @@ int32_t mndSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|||
int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return tmsgSendReq(pEpSet, pMsg); }
|
||||
|
||||
void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
SSdbRaw *pRaw = pMsg->pCont;
|
||||
|
||||
mTrace("raw:%p, apply to sdb, ver:%" PRId64 " term:%" PRId64 " role:%s", pRaw, cbMeta.index, cbMeta.term,
|
||||
syncStr(cbMeta.state));
|
||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
|
||||
if (cbMeta.state == TAOS_SYNC_STATE_LEADER) {
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
int32_t transId = sdbGetIdFromRaw(pRaw);
|
||||
pMgmt->errCode = cbMeta.code;
|
||||
mTrace("trans:%d, is proposed, savedTransId:%d code:0x%x, ver:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId,
|
||||
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw);
|
||||
|
||||
if (pMgmt->errCode == 0) {
|
||||
sdbWriteWithoutFree(pMnode->pSdb, pRaw);
|
||||
sdbSetApplyIndex(pMnode->pSdb, cbMeta.index);
|
||||
sdbSetApplyTerm(pMnode->pSdb, cbMeta.term);
|
||||
}
|
||||
|
||||
if (pMgmt->transId == transId) {
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:%d, failed to propose since %s", transId, tstrerror(pMgmt->errCode));
|
||||
}
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -78,11 +88,19 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char
|
|||
}
|
||||
|
||||
void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
|
||||
mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%" PRId64 ", cbMeta.term:%" PRId64 ", cbMeta.index:%" PRId64,
|
||||
cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index);
|
||||
SMnode *pMnode = pFsm->data;
|
||||
pMnode->syncMgmt.errCode = cbMeta.code;
|
||||
tsem_post(&pMnode->syncMgmt.syncSem);
|
||||
SMnode *pMnode = pFsm->data;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
||||
pMgmt->errCode = cbMeta.code;
|
||||
mInfo("trans:-1, sync reconfig is proposed, savedTransId:%d code:0x%x, curTerm:%" PRId64 " term:%" PRId64,
|
||||
pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term);
|
||||
|
||||
if (pMgmt->transId == -1) {
|
||||
if (pMgmt->errCode != 0) {
|
||||
mError("trans:-1, failed to propose sync reconfig since %s", tstrerror(pMgmt->errCode));
|
||||
}
|
||||
tsem_post(&pMgmt->syncSem);
|
||||
}
|
||||
}
|
||||
|
||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||
|
@ -165,15 +183,17 @@ void mndCleanupSync(SMnode *pMnode) {
|
|||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||
}
|
||||
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
||||
int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
pMgmt->errCode = 0;
|
||||
|
||||
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
|
||||
SRpcMsg rsp = {.code = TDMT_MND_APPLY_MSG, .contLen = sdbGetRawTotalSize(pRaw)};
|
||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||
if (rsp.pCont == NULL) return -1;
|
||||
memcpy(rsp.pCont, pRaw, rsp.contLen);
|
||||
|
||||
pMgmt->errCode = 0;
|
||||
pMgmt->transId = transId;
|
||||
mTrace("trans:%d, will be proposed", pMgmt->transId);
|
||||
|
||||
const bool isWeak = false;
|
||||
int32_t code = syncPropose(pMgmt->sync, &rsp, isWeak);
|
||||
if (code == 0) {
|
||||
|
@ -187,7 +207,11 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
|||
}
|
||||
|
||||
rpcFreeCont(rsp.pCont);
|
||||
if (code != 0) return code;
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to propose, code:0x%x", pMgmt->transId, code);
|
||||
return code;
|
||||
}
|
||||
|
||||
return pMgmt->errCode;
|
||||
}
|
||||
|
||||
|
|
|
@ -680,7 +680,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mDebug("trans:%d, sync to other nodes", pTrans->id);
|
||||
int32_t code = mndSyncPropose(pMnode, pRaw);
|
||||
int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to sync since %s", pTrans->id, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
|
|
|
@ -386,6 +386,8 @@ SSdbIter *sdbIterRead(SSdb *pSdb, SSdbIter *iter, char **ppBuf, int32_t *len);
|
|||
const char *sdbTableName(ESdbType type);
|
||||
void sdbPrintOper(SSdb *pSdb, SSdbRow *pRow, const char *oper);
|
||||
|
||||
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -16,6 +16,11 @@
|
|||
#define _DEFAULT_SOURCE
|
||||
#include "sdb.h"
|
||||
|
||||
int32_t sdbGetIdFromRaw(SSdbRaw *pRaw) {
|
||||
int32_t id = *((int32_t *)(pRaw->pData));
|
||||
return id;
|
||||
}
|
||||
|
||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen) {
|
||||
SSdbRaw *pRaw = taosMemoryCalloc(1, dataLen + sizeof(SSdbRaw));
|
||||
if (pRaw == NULL) {
|
||||
|
|
|
@ -123,5 +123,12 @@ sql create table db.stb (ts timestamp, c1 int, c2 binary(4)) tags(t1 int, t2 flo
|
|||
sql create table db.ctb using db.stb tags(101, 102, "103")
|
||||
sql insert into db.ctb values(now, 1, "2")
|
||||
|
||||
sql select * from db.ctb
|
||||
print $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop
|
||||
system sh/exec.sh -n dnode2 -s stop
|
|
@ -71,7 +71,7 @@ print ====> start to check if there are ERRORS in vagrind log file for each dnod
|
|||
# -n : dnode[x] be check
|
||||
system_content sh/checkValgrind.sh -n dnode1
|
||||
print cmd return result----> [ $system_content ]
|
||||
if $system_content <= 1 then
|
||||
if $system_content <= 2 then
|
||||
return 0
|
||||
endi
|
||||
|
||||
|
|
Loading…
Reference in New Issue