fix(sync): set handle when send response
This commit is contained in:
parent
b6e3f9d1fc
commit
1b9818e1cf
|
@ -214,7 +214,23 @@ static void vmProcessSyncQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOf
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
int32_t ret = vnodeProcessSyncReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
||||||
|
if (ret != 0) {
|
||||||
|
// if leader, send response
|
||||||
|
if (pMsg->rpcMsg.handle != NULL) {
|
||||||
|
SRpcMsg rsp;
|
||||||
|
rsp.pCont = NULL;
|
||||||
|
rsp.contLen = 0;
|
||||||
|
|
||||||
|
rsp.code = terrno;
|
||||||
|
dTrace("vmProcessSyncQueue error, code:%d", terrno);
|
||||||
|
|
||||||
|
rsp.ahandle = pMsg->rpcMsg.ahandle;
|
||||||
|
rsp.handle = pMsg->rpcMsg.handle;
|
||||||
|
rsp.refId = pMsg->rpcMsg.refId;
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rpcFreeCont(pMsg->rpcMsg.pCont);
|
rpcFreeCont(pMsg->rpcMsg.pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
|
|
|
@ -199,6 +199,8 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
int32_t ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
|
|
||||||
if (syncEnvIsStart()) {
|
if (syncEnvIsStart()) {
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
|
@ -220,67 +222,70 @@ int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING) {
|
||||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
syncPingDestroy(pSyncMsg);
|
syncPingDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_PING_REPLY) {
|
||||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncPingReplyDestroy(pSyncMsg);
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_CLIENT_REQUEST) {
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES) {
|
||||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesDestroy(pSyncMsg);
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_VND_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||||
|
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||||
|
ret = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
return 0;
|
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp) {
|
||||||
|
|
Loading…
Reference in New Issue