Merge pull request #11556 from taosdata/feature/vnode_refact1

refactor: vnode
This commit is contained in:
Hongze Cheng 2022-04-16 14:49:02 +08:00 committed by GitHub
commit 0231f6af09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 35 additions and 13 deletions

View File

@ -61,16 +61,16 @@ extern "C" {
} \ } \
} }
#define WAL_HEAD_VER 0 #define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20 #define WAL_NOSUFFIX_LEN 20
#define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1) #define WAL_SUFFIX_AT (WAL_NOSUFFIX_LEN + 1)
#define WAL_LOG_SUFFIX "log" #define WAL_LOG_SUFFIX "log"
#define WAL_INDEX_SUFFIX "idx" #define WAL_INDEX_SUFFIX "idx"
#define WAL_REFRESH_MS 1000 #define WAL_REFRESH_MS 1000
#define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead)) #define WAL_MAX_SIZE (TSDB_MAX_WAL_SIZE + sizeof(SWalHead))
#define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12) #define WAL_PATH_LEN (TSDB_FILENAME_LEN + 12)
#define WAL_FILE_LEN (WAL_PATH_LEN + 32) #define WAL_FILE_LEN (WAL_PATH_LEN + 32)
#define WAL_MAGIC 0xFAFBFCFDULL #define WAL_MAGIC 0xFAFBFCFDULL
#define WAL_CUR_FAILED 1 #define WAL_CUR_FAILED 1

View File

@ -97,8 +97,8 @@ int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg,
} }
if (pRsp != NULL) { if (pRsp != NULL) {
pRsp->affectedRows = htonl(affectedrows); pRsp->affectedRows = affectedrows;
pRsp->numOfRows = htonl(numOfRows); pRsp->numOfRows = numOfRows;
} }
return 0; return 0;

View File

@ -18,6 +18,7 @@
static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessCreateStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp); static int vnodeProcessCreateTbReq(SVnode *pVnode, SRpcMsg *pMsg, void *pReq, SRpcMsg **pRsp);
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq);
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp);
void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) { void vnodePreprocessWriteReqs(SVnode *pVnode, SArray *pMsgs) {
SNodeMsg *pMsg; SNodeMsg *pMsg;
@ -79,9 +80,10 @@ int vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_SUBMIT: case TDMT_VND_SUBMIT:
/*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/ /*printf("vnode %d write data %ld\n", pVnode->vgId, ver);*/
if (pVnode->config.streamMode == 0) { if (pVnode->config.streamMode == 0) {
if (tsdbInsertData(pVnode->pTsdb, (SSubmitReq *)ptr, NULL) < 0) { *pRsp = taosMemoryCalloc(1, sizeof(SRpcMsg));
// TODO: handle error (*pRsp)->handle = pMsg->handle;
} (*pRsp)->ahandle = pMsg->ahandle;
return vnodeProcessSubmitReq(pVnode, ptr, *pRsp);
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
@ -300,3 +302,23 @@ static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq) {
taosMemoryFree(vAlterTbReq.name); taosMemoryFree(vAlterTbReq.name);
return 0; return 0;
} }
static int vnodeProcessSubmitReq(SVnode *pVnode, SSubmitReq *pSubmitReq, SRpcMsg *pRsp) {
SSubmitRsp rsp = {0};
pRsp->code = 0;
// handle the request
if (tsdbInsertData(pVnode->pTsdb, pSubmitReq, &rsp) < 0) {
pRsp->code = terrno;
return -1;
}
// encode the response (TODO)
pRsp->msgType = TDMT_VND_SUBMIT_RSP;
pRsp->pCont = rpcMallocCont(sizeof(SSubmitRsp));
memcpy(pRsp->pCont, &rsp, sizeof(rsp));
pRsp->contLen = sizeof(SSubmitRsp);
return 0;
}