From 785f25c49d96ad44611398de5d2a81b07f08bfd5 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 19 Apr 2022 16:16:02 +0800 Subject: [PATCH] sync integration --- source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 33 +++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index f660990184..8bd334b018 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -117,6 +117,36 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO vnodePreprocessWriteReqs(pVnode->pImpl, pArray); + // sync integration response + /* + for (int i = 0; i < taosArrayGetSize(pArray); i++) { + SNodeMsg *pMsg; + SRpcMsg *pRpc; + + pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); + pRpc = &pMsg->rpcMsg; + + // set request version + void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead)); + // int64_t ver = pVnode->pImpl->state.processed++; // ??????? + int64_t ver; + taosEncodeFixedI64(&pBuf, ver); + + int32_t ret = syncPropose(pVnode->pImpl->sync, pRpc, false); + if (ret == TAOS_SYNC_PROPOSE_NOT_LEADER) { + // not leader + // send response + } else if (ret == TAOS_SYNC_PROPOSE_OTHER_ERROR) { + // send response + } else if (ret == TAOS_SYNC_PROPOSE_SUCCESS) { + // ok + // send response in applyQ + } else { + assert(0); + } + } + */ + numOfMsgs = taosArrayGetSize(pArray); for (int32_t i = 0; i < numOfMsgs; i++) { SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i); @@ -154,6 +184,9 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp); + + // sync integration response + // send response } }