add SyncApplyMsg into vmProcessApplyQueue
This commit is contained in:
parent
ec73953a0f
commit
d7edb105f2
|
@ -190,6 +190,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
taosGetQitem(qall, (void **)&pMsg);
|
taosGetQitem(qall, (void **)&pMsg);
|
||||||
|
|
||||||
|
// init response rpc msg
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
rsp.pCont = NULL;
|
rsp.pCont = NULL;
|
||||||
rsp.contLen = 0;
|
rsp.contLen = 0;
|
||||||
|
@ -201,6 +202,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
SRpcMsg originalRpcMsg;
|
SRpcMsg originalRpcMsg;
|
||||||
syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
|
syncApplyMsg2OriginalRpcMsg(pSyncApplyMsg, &originalRpcMsg);
|
||||||
|
|
||||||
|
// apply data into tsdb
|
||||||
if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
|
if (vnodeProcessWriteReq(pVnode->pImpl, &originalRpcMsg, pSyncApplyMsg->fsmMeta.index, &rsp) < 0) {
|
||||||
rsp.code = terrno;
|
rsp.code = terrno;
|
||||||
dTrace("vnodeProcessWriteReq error, code:%d", terrno);
|
dTrace("vnodeProcessWriteReq error, code:%d", terrno);
|
||||||
|
@ -209,6 +211,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
syncApplyMsgDestroy(pSyncApplyMsg);
|
syncApplyMsgDestroy(pSyncApplyMsg);
|
||||||
rpcFreeCont(originalRpcMsg.pCont);
|
rpcFreeCont(originalRpcMsg.pCont);
|
||||||
|
|
||||||
|
// if leader, send response
|
||||||
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
|
if (pMsg->rpcMsg.handle != NULL && pMsg->rpcMsg.ahandle != NULL) {
|
||||||
rsp.ahandle = pMsg->rpcMsg.ahandle;
|
rsp.ahandle = pMsg->rpcMsg.ahandle;
|
||||||
rsp.handle = pMsg->rpcMsg.handle;
|
rsp.handle = pMsg->rpcMsg.handle;
|
||||||
|
|
Loading…
Reference in New Issue