refact vnode
This commit is contained in:
parent
7895c492ba
commit
f8d1f5a813
|
@ -122,17 +122,26 @@ static void vmProcessWriteQueue(SQueueInfo *pInfo, STaosQall *qall, int32_t numO
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
SNodeMsg *pMsg = *(SNodeMsg **)taosArrayGet(pArray, i);
|
||||||
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
SRpcMsg *pRpc = &pMsg->rpcMsg;
|
||||||
SRpcMsg *pRsp = NULL;
|
SRpcMsg rsp;
|
||||||
|
|
||||||
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &pRsp);
|
rsp.pCont = NULL;
|
||||||
|
rsp.contLen = 0;
|
||||||
|
rsp.code = 0;
|
||||||
|
rsp.handle = pRpc->handle;
|
||||||
|
rsp.ahandle = pRpc->ahandle;
|
||||||
|
|
||||||
|
int32_t code = vnodeProcessWriteReq(pVnode->pImpl, pRpc, version++, &rsp);
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (pRsp != NULL) {
|
if (pRsp != NULL) {
|
||||||
pRsp->ahandle = pRpc->ahandle;
|
pRsp->ahandle = pRpc->ahandle;
|
||||||
tmsgSendRsp(pRsp);
|
|
||||||
taosMemoryFree(pRsp);
|
taosMemoryFree(pRsp);
|
||||||
} else {
|
} else {
|
||||||
if (code != 0 && terrno != 0) code = terrno;
|
if (code != 0 && terrno != 0) code = terrno;
|
||||||
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
vmSendRsp(pVnode->pWrapper, pMsg, code);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfMsgs; i++) {
|
for (int32_t i = 0; i < numOfMsgs; i++) {
|
||||||
|
|
|
@ -49,7 +49,7 @@ void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version);
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp);
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp);
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
|
||||||
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
|
||||||
|
|
||||||
|
@ -41,7 +41,7 @@ int vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs, int64_t *version) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg **pRsp) {
|
int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
int ret;
|
int ret;
|
||||||
|
|
||||||
|
@ -65,6 +65,7 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
ret = vnodeProcessCreateStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
return 0;
|
return 0;
|
||||||
case TDMT_VND_CREATE_TABLE:
|
case TDMT_VND_CREATE_TABLE:
|
||||||
|
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
||||||
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
return vnodeProcessCreateTbReq(pVnode, pMsg, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), pRsp);
|
||||||
case TDMT_VND_ALTER_STB:
|
case TDMT_VND_ALTER_STB:
|
||||||
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
return vnodeProcessAlterStbReq(pVnode, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)));
|
||||||
|
@ -74,14 +75,8 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg
|
||||||
case TDMT_VND_DROP_TABLE:
|
case TDMT_VND_DROP_TABLE:
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_SUBMIT:
|
case TDMT_VND_SUBMIT:
|
||||||
/*printf("vnode %d write data %ld\n", TD_VID(pVnode), ver);*/
|
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
||||||
if (pVnode->config.streamMode == 0) {
|
return vnodeProcessSubmitReq(pVnode, ptr, pRsp);
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
return vnodeProcessSubmitReq(pVnode, ptr, *pRsp);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// TODO: handle error
|
// TODO: handle error
|
||||||
|
@ -218,7 +213,7 @@ static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp) {
|
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg *pRsp) {
|
||||||
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
SVCreateTbBatchReq vCreateTbBatchReq = {0};
|
||||||
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
SVCreateTbBatchRsp vCreateTbBatchRsp = {0};
|
||||||
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
tDeserializeSVCreateTbBatchReq(pReq, &vCreateTbBatchReq);
|
||||||
|
@ -270,12 +265,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SR
|
||||||
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
tSerializeSVCreateTbBatchRsp(msg, contLen, &vCreateTbBatchRsp);
|
||||||
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
taosArrayDestroy(vCreateTbBatchRsp.rspList);
|
||||||
|
|
||||||
*pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
|
pRsp->pCont = msg;
|
||||||
(*pRsp)->msgType = TDMT_VND_CREATE_TABLE_RSP;
|
pRsp->contLen = contLen;
|
||||||
(*pRsp)->pCont = msg;
|
|
||||||
(*pRsp)->contLen = contLen;
|
|
||||||
(*pRsp)->handle = pMsg->handle;
|
|
||||||
(*pRsp)->ahandle = pMsg->ahandle;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -308,7 +299,6 @@ static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg
|
||||||
}
|
}
|
||||||
|
|
||||||
// encode the response (TODO)
|
// encode the response (TODO)
|
||||||
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
|
|
||||||
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
|
||||||
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
|
||||||
pRsp->contLen = sizeof(SSubmitRsp);
|
pRsp->contLen = sizeof(SSubmitRsp);
|
||||||
|
|
Loading…
Reference in New Issue