refact vnod3
This commit is contained in:
parent
7273581072
commit
7895c492ba
|
@ -97,6 +97,7 @@ static void vmProcessFetchQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
|
||||||
|
|
||||||
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnodeObj *pVnode = pInfo->ahandle;
|
SVnodeObj *pVnode = pInfo->ahandle;
|
||||||
|
int64_t version;
|
||||||
|
|
||||||
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SNodeMsg *));
|
||||||
if (pArray == NULL) {
|
if (pArray == NULL) {
|
||||||
|
@ -115,7 +116,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vnodePreprocessWriteReqs(pVnode->pImpl, pArray);
|
vnodePreprocessWriteReqs(pVnode->pImpl, pArray, &version);
|
||||||
|
|
||||||
numOfMsgs = taosArrayGetSize(pArray);
|
numOfMsgs = taosArrayGetSize(pArray);
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
@ -123,7 +124,7 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
|
|
||||||
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, &pRsp);
|
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp);
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
|
@ -153,7 +154,7 @@ static void vmProcessApplyQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
|
|
||||||
// todo
|
// todo
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg *pRsp = NULL;
|
||||||
(void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
// (void)vnodeProcessWriteReq(pVnode->pImpl, &pMsg->rpcMsg, &pRsp);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,8 +48,8 @@ int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs);
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -20,30 +20,28 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
||||||
|
|
||||||
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
|
||||||
SNodeMsg *pMsg;
|
SNodeMsg *pMsg;
|
||||||
SRpcMsg *pRpc;
|
SRpcMsg *pRpc;
|
||||||
|
|
||||||
|
*version = pVnode->state.processed;
|
||||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||||
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
pMsg = *(SNodeMsg **)taosArrayGet(pMsgs, i);
|
||||||
pRpc = &pMsg->rpcMsg;
|
pRpc = &pMsg->rpcMsg;
|
||||||
|
|
||||||
// set request version
|
// set request version
|
||||||
void *pBuf = POINTER_SHIFT(pRpc->pCont, sizeof(SMsgHead));
|
if (walWrite(pVnode->pWal, pVnode->state.processed++, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
||||||
int64_t ver = pVnode->state.processed++;
|
|
||||||
taosEncodeFixedI64(&pBuf, ver);
|
|
||||||
|
|
||||||
if (walWrite(pVnode->pWal, ver, pRpc->msgType, pRpc->pCont, pRpc->contLen) < 0) {
|
|
||||||
// TODO: handle error
|
|
||||||
/*ASSERT(false);*/
|
|
||||||
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
vError("vnode:%d write wal error since %s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
walFsync(pVnode->pWal, false);
|
walFsync(pVnode->pWal, false);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -58,9 +56,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo: change the interface here
|
// todo: change the interface here
|
||||||
int64_t ver;
|
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) {
|
||||||
taosDecodeFixedI64(POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), &ver);
|
|
||||||
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
|
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -128,7 +124,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->state.applied = ver;
|
pVnode->state.applied = version;
|
||||||
|
|
||||||
// Check if it needs to commit
|
// Check if it needs to commit
|
||||||
if (vnodeShouldCommit(pVnode)) {
|
if (vnodeShouldCommit(pVnode)) {
|
||||||
|
|
Loading…
Reference in New Issue