add function

This commit is contained in:
Liu Jicong 2021-10-26 10:05:50 +08:00
parent 108c14aa79
commit 9c2b0c0217
3 changed files with 9 additions and 6 deletions

View File

@ -98,8 +98,12 @@ static int32_t dnodeInitServer() {
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg;
/*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg;
tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg;
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));

View File

@ -797,13 +797,11 @@ static void vnodeInitMsgFp() {
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg;
//mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
//mq related
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg;
//mq related end //mq related end
tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg;
tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg;
} }
void vnodeProcessMsg(SRpcMsg *pMsg) { void vnodeProcessMsg(SRpcMsg *pMsg) {

View File

@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
//fetch or register context //fetch or register context
tqFetchMsg(pHandle, pRead); tqFetchMsg(pHandle, pRead);
//judge mode, tail read or catch up read //judge mode, tail read or catch up read
/*int64_t lastVer = walLastVer(pVnode->wal);*/
//launch new query //launch new query
return 0; return 0;
} }