From 4113df0b624e6b11208747f2985d6f9906525a49 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 14 Oct 2021 16:48:37 +0800 Subject: [PATCH 01/11] add some msg for tq --- include/common/taosmsg.h | 6 +++- include/server/vnode/tq/tq.h | 54 ++++++++++++++++++++++++++++-- source/server/vnode/tq/inc/tqInt.h | 7 ++-- source/server/vnode/tq/src/tq.c | 2 +- 4 files changed, 62 insertions(+), 7 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 8f89df40d0..78f91cca64 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -41,6 +41,10 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_SUBMIT, "submit" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY2, "dummy2" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY3, "dummy3" ) @@ -113,7 +117,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_NETWORK_TEST, "nettest" ) // message for topic TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_TP, "create-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) -TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) +//TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_USE_TP, "use-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index eb9c57c581..ef6a34ffa3 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,6 +22,56 @@ extern "C" { #endif +typedef struct tmqMsgHead { + int32_t headLen; + int32_t msgVer; + int64_t cgId; + int32_t topicLen; + char topic[]; +} tmqMsgHead; + +//TODO: put msgs into common +typedef struct tmqConnectReq { + tmqMsgHead head; + +} tmqConnectReq; + +typedef struct tmqConnectResp { + +} tmqConnectResp; + +typedef struct tmqDisconnectReq { + +} tmqDisconnectReq; + +typedef struct tmqDisconnectResp { + +} tmqDiconnectResp; + +typedef struct tmqConsumeReq { + +} tmqConsumeReq; + +typedef struct tmqConsumeResp { + +} tmqConsumeResp; + +typedef struct tmqSubscribeReq { + +} tmqSubscribeReq; + +typedef struct tmqSubscribeResp { + +} tmqSubscribeResp; + +typedef struct tmqHeartbeatReq { + +} tmqHeartbeatReq; + +typedef struct tmqHeartbeatResp { + +} tmqHeartbeatResp; + typedef struct tqTopicVhandle { //name // @@ -29,7 +79,7 @@ typedef struct tqTopicVhandle { // //callback for mnode // -} tqTopic; +} tqTopicVhandle; typedef struct STQ { //the set for topics @@ -50,7 +100,7 @@ int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); //void* will be replace by a msg type -int tqHandleMsg(STQ*, void* msg); +int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index c42bcfef43..cba9075fe9 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,14 +26,15 @@ extern "C" { typedef struct tqBufferItem { int64_t offset; - void *content; + void* executor; + void* content; } tqBufferItem; typedef struct tqGroupHandle { - char* topic; - void* ahandle; + char* topic; //c style, end with '\0' int64_t cgId; + void* ahandle; int64_t consumeOffset; int32_t head; int32_t tail; diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 2ef2a4b6ea..7733ac29b5 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -77,7 +77,7 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleMsg(STQ* pTq, void*msg) { +int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { //parse msg and extract topic and cgId //lookup handle //confirm message and send to consumer From 42fd474beaa826bb7c70d638baa18c7c080bb8b8 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:39:52 +0800 Subject: [PATCH 02/11] add vnodeprocess msg function --- include/common/taosmsg.h | 38 ++++++++++++++++++++++++- include/server/vnode/tq/tq.h | 5 ++-- source/server/dnode/src/dnodeTrans.c | 11 +++++-- source/server/vnode/inc/vnodeReadMsg.h | 2 ++ source/server/vnode/inc/vnodeWriteMsg.h | 8 +++++- source/server/vnode/src/vnodeMain.c | 36 ++++++++++++++--------- source/server/vnode/src/vnodeRead.c | 3 ++ source/server/vnode/src/vnodeWrite.c | 18 ++++++++++-- 8 files changed, 99 insertions(+), 22 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 78f91cca64..66a02f350e 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -42,7 +42,9 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_QUERY, "query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_FETCH, "fetch" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_UPDATE_TAG_VAL, "update-tag-val" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONNECT, "mq-connect" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_DISCONNECT, "mq-disconnect" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_CONSUME, "mq-consume" ) +TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_QUERY, "mq-query" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_ACK, "mq-ack" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MQ_RESET, "mq-reset" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY1, "dummy1" ) @@ -121,7 +123,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TP, "drop-tp" ) TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TP, "alter-tp" ) #ifndef TAOS_MESSAGE_C - TSDB_MSG_TYPE_MAX // 105 + TSDB_MSG_TYPE_MAX // 147 #endif }; @@ -958,6 +960,40 @@ typedef struct { char reserved2[64]; } SStartupStep; +// mq related +typedef struct { + +} SMqConnectReq; + +typedef struct { + +} SMqConnectRsp; + +typedef struct { + +} SMqDisconnectReq; + +typedef struct { + +} SMqDisconnectRsp; + +typedef struct { + +} SMqAckReq; + +typedef struct { + +} SMqAckRsp; + +typedef struct { + +} SMqResetReq; + +typedef struct { + +} SMqResetRsp; +//mq related end + typedef struct { /* data */ } SSubmitReq; diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index ef6a34ffa3..6e56e8256f 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -26,8 +26,9 @@ typedef struct tmqMsgHead { int32_t headLen; int32_t msgVer; int64_t cgId; - int32_t topicLen; - char topic[]; + int64_t topicId; + int32_t checksum; + int32_t msgType; } tmqMsgHead; //TODO: put msgs into common diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index a4409674f1..1739283f34 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -97,6 +97,9 @@ int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); rpcInit.localPort = tsDnodeDnodePort; @@ -308,10 +311,12 @@ static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, c } int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; + tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; // the following message shall be treated as mnode write tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; diff --git a/source/server/vnode/inc/vnodeReadMsg.h b/source/server/vnode/inc/vnodeReadMsg.h index a1efb729e1..1efc74d1af 100644 --- a/source/server/vnode/inc/vnodeReadMsg.h +++ b/source/server/vnode/inc/vnodeReadMsg.h @@ -36,6 +36,8 @@ typedef struct SReadMsg { int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead); int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead); +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead); #ifdef __cplusplus } diff --git a/source/server/vnode/inc/vnodeWriteMsg.h b/source/server/vnode/inc/vnodeWriteMsg.h index 86cdba6946..9dbc4fe490 100644 --- a/source/server/vnode/inc/vnodeWriteMsg.h +++ b/source/server/vnode/inc/vnodeWriteMsg.h @@ -27,9 +27,15 @@ int32_t vnodeProcessDropTableReq(SVnode *pVnode, SDropTableReq *pReq, SDropTable int32_t vnodeProcessAlterTableReq(SVnode *pVnode, SAlterTableReq *pReq, SAlterTableRsp *pRsp); int32_t vnodeProcessDropStableReq(SVnode *pVnode, SDropStableReq *pReq, SDropStableRsp *pRsp); int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpdateTagValRsp *pRsp); +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp); +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp); +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp); +//mq related end #ifdef __cplusplus } #endif -#endif /*_TD_VNODE_WRITE_MSG_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_WRITE_MSG_H_*/ diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index d9c1a88d15..da1c1d7235 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -780,20 +780,30 @@ static void vnodeCleanupVnodes() { } static void vnodeInitMsgFp() { - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMgmtMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMgmtMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessWriteMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; + //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; + //mq related + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; + //mq related end } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeRead.c b/source/server/vnode/src/vnodeRead.c index 39b6983b7d..0bf907c419 100644 --- a/source/server/vnode/src/vnodeRead.c +++ b/source/server/vnode/src/vnodeRead.c @@ -141,6 +141,9 @@ void vnodeProcessReadMsg(SRpcMsg *pMsg) { static void vnodeInitReadMsgFp() { tsVread.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; tsVread.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; + + tsVread.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessTqQueryMsg; + tsVread.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessConsumeMsg; } static int32_t vnodeProcessReadStart(SVnode *pVnode, SReadMsg *pRead, int32_t qtype) { diff --git a/source/server/vnode/src/vnodeWrite.c b/source/server/vnode/src/vnodeWrite.c index 3c2634a2cf..c103460241 100644 --- a/source/server/vnode/src/vnodeWrite.c +++ b/source/server/vnode/src/vnodeWrite.c @@ -179,6 +179,20 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t case TSDB_MSG_TYPE_UPDATE_TAG_VAL: pWrite->code = vnodeProcessUpdateTagValReq(pVnode, (void*)pHead->cont, NULL); break; + //mq related + case TSDB_MSG_TYPE_MQ_CONNECT: + pWrite->code = vnodeProcessMqConnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_DISCONNECT: + pWrite->code = vnodeProcessMqDisconnectReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_ACK: + pWrite->code = vnodeProcessMqAckReq(pVnode, (void*)pHead->cont, NULL); + break; + case TSDB_MSG_TYPE_MQ_RESET: + pWrite->code = vnodeProcessMqResetReq(pVnode, (void*)pHead->cont, NULL); + break; + //mq related end default: pWrite->code = TSDB_CODE_VND_MSG_NOT_PROCESSED; break; @@ -186,7 +200,7 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t if (pWrite->code < 0) return false; - // update fync + // update fsync return (pWrite->code == 0 && msgType != TSDB_MSG_TYPE_SUBMIT); } @@ -233,4 +247,4 @@ void vnodeCleanupWrite() { taos_queue vnodeAllocWriteQueue(SVnode *pVnode) { return tWriteWorkerAllocQueue(&tsVwrite.pool, pVnode); } -void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } \ No newline at end of file +void vnodeFreeWriteQueue(taos_queue pQueue) { tWriteWorkerFreeQueue(&tsVwrite.pool, pQueue); } From 353400ae4c7f51701f9be21d27d56d78bc318fcb Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 10:50:11 +0800 Subject: [PATCH 03/11] add vnodeprocess msg for tq --- source/server/vnode/src/vnodeReadMsg.c | 9 +++++++++ source/server/vnode/src/vnodeWriteMsg.c | 15 +++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 8a0f4b2e0f..158e550dcf 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -217,6 +217,15 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { return 0; } +//mq related +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ + return 0; +} +int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + return 0; +} +//mq related end + int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { #if 0 void * pCont = pRead->pCont; diff --git a/source/server/vnode/src/vnodeWriteMsg.c b/source/server/vnode/src/vnodeWriteMsg.c index 0fe6fa2bc9..2e13d0035d 100644 --- a/source/server/vnode/src/vnodeWriteMsg.c +++ b/source/server/vnode/src/vnodeWriteMsg.c @@ -77,3 +77,18 @@ int32_t vnodeProcessUpdateTagValReq(SVnode *pVnode, SUpdateTagValReq *pReq, SUpd // TODO return 0; } + +//mq related +int32_t vnodeProcessMqConnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp){ + return 0; +} +int32_t vnodeProcessMqDisconnectReq(SVnode* pVnode, SMqConnectReq *pReq, SMqConnectRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqAckReq(SVnode* pVnode, SMqAckReq *pReq, SMqAckRsp *pRsp) { + return 0; +} +int32_t vnodeProcessMqResetReq(SVnode* pVnode, SMqResetReq *pReq, SMqResetRsp *pRsp) { + return 0; +} +//mq related end From c0dfa6cdd28510530510f7fe6caee340c87d13a9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 18 Oct 2021 13:15:42 +0800 Subject: [PATCH 04/11] add tq header --- include/server/vnode/tq/tq.h | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 6e56e8256f..7cac0e81c4 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -103,6 +103,15 @@ int tqCommit(STQ*); //void* will be replace by a msg type int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqMoveOffsetToNext(STQ*, int64_t topicId, int64_t cgId); +int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); +int tqFetchMsg(STQ*, int64_t topicId, int64_t cgId); +int tqRegisterContext(STQ*, int64_t topicId, int64_t cgId, void* ahandle); +int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); + #ifdef __cplusplus } #endif From 83c403ff7490a1361f626d8a6432cd26ebcf0d15 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 14:56:19 +0800 Subject: [PATCH 05/11] add tq data structure --- include/server/vnode/tq/tq.h | 57 +++++++++++++++++++------- source/server/vnode/src/vnodeReadMsg.c | 25 ++++++++++- source/server/vnode/tq/inc/tqInt.h | 22 +--------- source/server/vnode/tq/src/tq.c | 8 ++-- 4 files changed, 73 insertions(+), 39 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 7cac0e81c4..f30ba75c42 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -24,9 +24,10 @@ extern "C" { typedef struct tmqMsgHead { int32_t headLen; - int32_t msgVer; + int32_t protoVer; int64_t cgId; int64_t topicId; + int64_t clientId; int32_t checksum; int32_t msgType; } tmqMsgHead; @@ -34,35 +35,43 @@ typedef struct tmqMsgHead { //TODO: put msgs into common typedef struct tmqConnectReq { tmqMsgHead head; - } tmqConnectReq; typedef struct tmqConnectResp { - + tmqMsgHead head; + int8_t status; } tmqConnectResp; typedef struct tmqDisconnectReq { - + tmqMsgHead head; } tmqDisconnectReq; typedef struct tmqDisconnectResp { - + tmqMsgHead head; + int8_t status; } tmqDiconnectResp; typedef struct tmqConsumeReq { - + tmqMsgHead head; + int64_t commitOffset; } tmqConsumeReq; typedef struct tmqConsumeResp { - + tmqMsgHead head; + char content[]; } tmqConsumeResp; -typedef struct tmqSubscribeReq { - +// +typedef struct tmqMnodeSubscribeReq { + tmqMsgHead head; + int64_t topicLen; + char topic[]; } tmqSubscribeReq; -typedef struct tmqSubscribeResp { - +typedef struct tmqMnodeSubscribeResp { + tmqMsgHead head; + int64_t vgId; + char ep[]; //TSDB_EP_LEN } tmqSubscribeResp; typedef struct tmqHeartbeatReq { @@ -92,6 +101,24 @@ typedef struct STQ { //value=consumeOffset: int64_t } STQ; +#define TQ_BUFFER_SIZE 8 + +typedef struct tqBufferItem { + int64_t offset; + void* executor; + void* content; +} tqBufferItem; + +typedef struct tqGroupHandle { + char* topic; //c style, end with '\0' + int64_t cgId; + void* ahandle; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; + //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); @@ -103,12 +130,14 @@ int tqCommit(STQ*); //void* will be replace by a msg type int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId); + int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqMoveOffsetToNext(STQ*, int64_t topicId, int64_t cgId); +int tqMoveOffsetToNext(tqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqFetchMsg(STQ*, int64_t topicId, int64_t cgId); -int tqRegisterContext(STQ*, int64_t topicId, int64_t cgId, void* ahandle); +int tqFetchMsg(tqGroupHandle*, void*); +int tqRegisterContext(tqGroupHandle*, void*); int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 158e550dcf..b4070546c7 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -218,10 +218,33 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { } //mq related -int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { + //parse message and optionally move offset + void* pMsg = pRead->pCont; + tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; + tmqMsgHead msgHead = pConsumeMsg->head; + //extract head + STQ *pTq = pVnode->pTQ; + tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); + //return msg if offset not moved + if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { + //return msg + return 0; + } + //or move offset + tqMoveOffsetToNext(pHandle); + //fetch or register context + tqFetchMsg(pHandle, pRead); + //judge mode, tail read or catch up read + //launch new query return 0; } + int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + //get operator tree from tq data structure + //execute operator tree + //put data into ringbuffer + //unref memory return 0; } //mq related end diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index cba9075fe9..0896e7afab 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,37 +18,19 @@ #include "tq.h" -#define TQ_BUFFER_SIZE 8 #ifdef __cplusplus extern "C" { #endif -typedef struct tqBufferItem { - int64_t offset; - void* executor; - void* content; -} tqBufferItem; - - -typedef struct tqGroupHandle { - char* topic; //c style, end with '\0' - int64_t cgId; - void* ahandle; - int64_t consumeOffset; - int32_t head; - int32_t tail; - tqBufferItem buffer[TQ_BUFFER_SIZE]; -} tqGroupHandle; - //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenTCGroup(STQ*, const char* topic, int cgId); +//int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseTCGroup(STQ*, const char* topic, int cgId); +//int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info int tqDropTCGroup(STQ*, const char* topic, int cgId); diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 7733ac29b5..b2bfbced37 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,7 +22,7 @@ // //handle management message -static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { +tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { //look in memory // //not found, try to restore from disk @@ -56,9 +56,9 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { return 0; } -int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { - tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); - return tqCommitTCGroup(handle); +/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ + /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ + /*return tqCommitTCGroup(handle);*/ } int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { From 477e31431579da21c27a30c700b3a7068edd4d28 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 15:03:24 +0800 Subject: [PATCH 06/11] Delete dnodeTrans.c.orig --- source/server/dnode/src/dnodeTrans.c.orig | 413 ---------------------- 1 file changed, 413 deletions(-) delete mode 100644 source/server/dnode/src/dnodeTrans.c.orig diff --git a/source/server/dnode/src/dnodeTrans.c.orig b/source/server/dnode/src/dnodeTrans.c.orig deleted file mode 100644 index 040a47de6f..0000000000 --- a/source/server/dnode/src/dnodeTrans.c.orig +++ /dev/null @@ -1,413 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -/* this file is mainly responsible for the communication between DNODEs. Each - * dnode works as both server and client. Dnode may send status, grant, config - * messages to mnode, mnode may send create/alter/drop table/vnode messages - * to dnode. All theses messages are handled from here - */ - -#define _DEFAULT_SOURCE -#include "dnodeTrans.h" -#include "dnodeMain.h" -#include "dnodeMnodeEps.h" -#include "dnodeStatus.h" -#include "mnode.h" -#include "vnode.h" - -typedef void (*RpcMsgFp)(SRpcMsg *pMsg); - -static struct { - void *serverRpc; - void *clientRpc; - void *shellRpc; - int32_t queryReqNum; - int32_t submitReqNum; - RpcMsgFp peerMsgFp[TSDB_MSG_TYPE_MAX]; - RpcMsgFp shellMsgFp[TSDB_MSG_TYPE_MAX]; -} tsTrans; - -static void dnodeProcessPeerReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; - int32_t msgType = pMsg->msgType; - - if (msgType == TSDB_MSG_TYPE_NETWORK_TEST) { - dnodeProcessStartupReq(pMsg); - return; - } - - if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { - rspMsg.code = TSDB_CODE_APP_NOT_READY; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - dTrace("RPC %p, peer req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); - return; - } - - if (pMsg->pCont == NULL) { - rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; - rpcSendResponse(&rspMsg); - return; - } - - RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; - if (fp != NULL) { - dTrace("RPC %p, peer req:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); - } else { - dError("RPC %p, peer req:%s not processed", pMsg->handle, taosMsg[msgType]); - rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - } -} - -static int32_t dnodeInitServer() { - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = vnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = vnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = vnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeReq; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeProcessCreateMnodeReq; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ - - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeDnodePort; - rpcInit.label = "DND-S"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessPeerReq; - rpcInit.sessions = TSDB_MAX_VNODES << 4; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; - - tsTrans.serverRpc = rpcOpen(&rpcInit); - if (tsTrans.serverRpc == NULL) { - dError("failed to init peer rpc server"); - return -1; - } - - dInfo("dnode peer rpc server is initialized"); - return 0; -} - -static void dnodeCleanupServer() { - if (tsTrans.serverRpc) { - rpcClose(tsTrans.serverRpc); - tsTrans.serverRpc = NULL; - dInfo("dnode peer server is closed"); - } -} - -static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - int32_t msgType = pMsg->msgType; - - if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { - if (pMsg == NULL || pMsg->pCont == NULL) return; - dTrace("RPC %p, peer rsp:%s is ignored since dnode is stopping", pMsg->handle, taosMsg[msgType]); - rpcFreeCont(pMsg->pCont); - return; - } - - if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeFromPeer(pEpSet); - } - - RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; - if (fp != NULL) { - dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); - } else { - dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); - } - - rpcFreeCont(pMsg->pCont); -} - -static int32_t dnodeInitClient() { - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE_RSP] = mnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE_RSP] = mnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM_RSP] = mnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE_RSP] = mnodeProcessMsg; - - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_AUTH_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT_RSP] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS_RSP] = dnodeProcessStatusRsp; - - char secret[TSDB_KEY_LEN] = "secret"; - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.label = "DND-C"; - rpcInit.numOfThreads = 1; - rpcInit.cfp = dnodeProcessPeerRsp; - rpcInit.sessions = TSDB_MAX_VNODES << 4; - rpcInit.connType = TAOS_CONN_CLIENT; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.user = "t"; - rpcInit.ckey = "key"; - rpcInit.secret = secret; - - tsTrans.clientRpc = rpcOpen(&rpcInit); - if (tsTrans.clientRpc == NULL) { - dError("failed to init peer rpc client"); - return -1; - } - - dInfo("dnode peer rpc client is initialized"); - return 0; -} - -static void dnodeCleanupClient() { - if (tsTrans.clientRpc) { - rpcClose(tsTrans.clientRpc); - tsTrans.clientRpc = NULL; - dInfo("dnode peer rpc client is closed"); - } -} - -static void dnodeProcessShellReq(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { - SRpcMsg rspMsg = {.handle = pMsg->handle, .pCont = NULL, .contLen = 0}; - int32_t msgType = pMsg->msgType; - - if (dnodeGetRunStat() == DN_RUN_STAT_STOPPED) { - dError("RPC %p, shell req:%s is ignored since dnode exiting", pMsg->handle, taosMsg[msgType]); - rspMsg.code = TSDB_CODE_DND_EXITING; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - return; - } else if (dnodeGetRunStat() != DN_RUN_STAT_RUNNING) { - dError("RPC %p, shell req:%s is ignored since dnode not running", pMsg->handle, taosMsg[msgType]); - rspMsg.code = TSDB_CODE_APP_NOT_READY; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - return; - } - - if (pMsg->pCont == NULL) { - rspMsg.code = TSDB_CODE_DND_INVALID_MSG_LEN; - rpcSendResponse(&rspMsg); - return; - } - - if (msgType == TSDB_MSG_TYPE_QUERY) { - atomic_fetch_add_32(&tsTrans.queryReqNum, 1); - } else if (msgType == TSDB_MSG_TYPE_SUBMIT) { - atomic_fetch_add_32(&tsTrans.submitReqNum, 1); - } else { - } - - RpcMsgFp fp = tsTrans.shellMsgFp[msgType]; - if (fp != NULL) { - dTrace("RPC %p, shell req:%s will be processed", pMsg->handle, taosMsg[msgType]); - (*fp)(pMsg); - } else { - dError("RPC %p, shell req:%s is not processed", pMsg->handle, taosMsg[msgType]); - rspMsg.code = TSDB_CODE_DND_MSG_NOT_PROCESSED; - rpcSendResponse(&rspMsg); - rpcFreeCont(pMsg->pCont); - } -} - -void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg) { rpcSendRequest(tsTrans.clientRpc, epSet, rpcMsg, NULL); } - -void dnodeSendMsgToMnode(SRpcMsg *rpcMsg) { - SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(&epSet); - dnodeSendMsgToDnode(&epSet, rpcMsg); -} - -static void dnodeSendMsgToMnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp) { - SRpcEpSet epSet = {0}; - dnodeGetEpSetForPeer(&epSet); - rpcSendRecv(tsTrans.clientRpc, &epSet, rpcMsg, rpcRsp); -} - -static int32_t dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char *secret, char *ckey) { - int32_t code = mnodeRetriveAuth(user, spi, encrypt, secret, ckey); - if (code != TSDB_CODE_APP_NOT_READY) return code; - - SAuthMsg *pMsg = rpcMallocCont(sizeof(SAuthMsg)); - tstrncpy(pMsg->user, user, sizeof(pMsg->user)); - - SRpcMsg rpcMsg = {0}; - rpcMsg.pCont = pMsg; - rpcMsg.contLen = sizeof(SAuthMsg); - rpcMsg.msgType = TSDB_MSG_TYPE_DM_AUTH; - - dDebug("user:%s, send auth msg to mnodes", user); - SRpcMsg rpcRsp = {0}; - dnodeSendMsgToMnodeRecv(&rpcMsg, &rpcRsp); - - if (rpcRsp.code != 0) { - dError("user:%s, auth msg received from mnodes, error:%s", user, tstrerror(rpcRsp.code)); - } else { - SAuthRsp *pRsp = rpcRsp.pCont; - dDebug("user:%s, auth msg received from mnodes", user); - memcpy(secret, pRsp->secret, TSDB_KEY_LEN); - memcpy(ckey, pRsp->ckey, TSDB_KEY_LEN); - *spi = pRsp->spi; - *encrypt = pRsp->encrypt; - } - - rpcFreeCont(rpcRsp.pCont); - return rpcRsp.code; -} - -<<<<<<< HEAD -int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; -======= -static int32_t dnodeInitShell() { - tsTrans.shellMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; ->>>>>>> 3.0 - - // the following message shall be treated as mnode write - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_ACCT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_USER] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_FUNCTION] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SYNC_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_FUNCTION] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CREATE_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_DROP_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_TABLE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_ALTER_STREAM] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_QUERY] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE] = mnodeProcessMsg; - - // the following message shall be treated as mnode query - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_CONNECT] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_USE_DB] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLE_META] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_STABLE_VGROUP] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_TABLES_META] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_SHOW] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE] = mnodeProcessMsg; - tsTrans.shellMsgFp[TSDB_MSG_TYPE_CM_RETRIEVE_FUNC] = mnodeProcessMsg; - - tsTrans.shellMsgFp[TSDB_MSG_TYPE_NETWORK_TEST] = dnodeProcessStartupReq; - - int32_t numOfThreads = (int32_t)((tsNumOfCores * tsNumOfThreadsPerCore) / 2.0); - if (numOfThreads < 1) { - numOfThreads = 1; - } - - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = tsDnodeShellPort; - rpcInit.label = "SHELL"; - rpcInit.numOfThreads = numOfThreads; - rpcInit.cfp = dnodeProcessShellReq; - rpcInit.sessions = tsMaxShellConns; - rpcInit.connType = TAOS_CONN_SERVER; - rpcInit.idleTime = tsShellActivityTimer * 1000; - rpcInit.afp = dnodeRetrieveUserAuthInfo; - - tsTrans.shellRpc = rpcOpen(&rpcInit); - if (tsTrans.shellRpc == NULL) { - dError("failed to init shell rpc server"); - return -1; - } - - dInfo("dnode shell rpc server is initialized"); - return 0; -} - -static void dnodeCleanupShell() { - if (tsTrans.shellRpc) { - rpcClose(tsTrans.shellRpc); - tsTrans.shellRpc = NULL; - } -} - -int32_t dnodeInitTrans() { - if (dnodeInitClient() != 0) { - return -1; - } - - if (dnodeInitServer() != 0) { - return -1; - } - - if (dnodeInitShell() != 0) { - return -1; - } - - return 0; -} - -void dnodeCleanupTrans() { - dnodeCleanupShell(); - dnodeCleanupServer(); - dnodeCleanupClient(); -} From 92854ffb0b2d874367aa4692292c7ec058639afd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 15:03:42 +0800 Subject: [PATCH 07/11] Delete vnodeReadMsg.c.orig --- source/server/vnode/src/vnodeReadMsg.c.orig | 352 -------------------- 1 file changed, 352 deletions(-) delete mode 100644 source/server/vnode/src/vnodeReadMsg.c.orig diff --git a/source/server/vnode/src/vnodeReadMsg.c.orig b/source/server/vnode/src/vnodeReadMsg.c.orig deleted file mode 100644 index 19db9c7763..0000000000 --- a/source/server/vnode/src/vnodeReadMsg.c.orig +++ /dev/null @@ -1,352 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "os.h" -#include "taosmsg.h" -#include "tglobal.h" -// #include "query.h" -#include "vnodeStatus.h" -#include "vnodeRead.h" -#include "vnodeReadMsg.h" - -#if 0 -// notify connection(handle) that current qhandle is created, if current connection from -// client is broken, the query needs to be killed immediately. -static int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) { - SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg)); - pMsg->qId = htobe64(qId); - pMsg->header.vgId = htonl(vgId); - pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg)); - - vTrace("QInfo:0x%" PRIx64 "-%p register qhandle to connect:%p", qId, qhandle, handle); - return rpcReportProgress(handle, (char *)pMsg, sizeof(SRetrieveTableMsg)); -} - -/** - * @param pRet response message object - * @param pVnode the vnode object - * @param handle qhandle for executing query - * @param freeHandle free qhandle or not - * @param ahandle sqlObj address at client side - * @return - */ -static int32_t vnodeDumpQueryResult(SVnRsp *pRet, void *pVnode, uint64_t qId, void **handle, bool *freeHandle, - void *ahandle) { - bool continueExec = false; - - int32_t code = TSDB_CODE_SUCCESS; - if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len, &continueExec)) == - TSDB_CODE_SUCCESS) { - if (continueExec) { - *freeHandle = false; - code = vnodeReputPutToRQueue(pVnode, handle, ahandle); - if (code != TSDB_CODE_SUCCESS) { - *freeHandle = true; - return code; - } else { - pRet->qhandle = *handle; - } - } else { - *freeHandle = true; - vTrace("QInfo:0x%" PRIx64 "-%p exec completed, free handle:%d", qId, *handle, *freeHandle); - } - } else { - SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - memset(pRsp, 0, sizeof(SRetrieveTableRsp)); - pRsp->completed = true; - - pRet->rsp = pRsp; - pRet->len = sizeof(SRetrieveTableRsp); - *freeHandle = true; - } - - return code; -} - -static void vnodeBuildNoResultQueryRsp(SVnRsp *pRet) { - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); - - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - SRetrieveTableRsp *pRsp = pRet->rsp; - - pRsp->completed = true; -} -#endif - -int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { -#if 0 - void * pCont = pRead->pCont; - int32_t contLen = pRead->contLen; - SVnRsp *pRet = &pRead->rspRet; - - SQueryTableMsg *pQueryTableMsg = (SQueryTableMsg *)pCont; - memset(pRet, 0, sizeof(SVnRsp)); - - // qHandle needs to be freed correctly - if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { - vError("error rpc msg in query, %s", tstrerror(pRead->code)); - } - - int32_t code = TSDB_CODE_SUCCESS; - void ** handle = NULL; - - if (contLen != 0) { - qinfo_t pQInfo = NULL; - uint64_t qId = genQueryId(); - code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo, qId); - - SQueryTableRsp *pRsp = (SQueryTableRsp *)rpcMallocCont(sizeof(SQueryTableRsp)); - pRsp->code = code; - pRsp->qId = 0; - - pRet->len = sizeof(SQueryTableRsp); - pRet->rsp = pRsp; - int32_t vgId = pVnode->vgId; - - // current connect is broken - if (code == TSDB_CODE_SUCCESS) { - handle = qRegisterQInfo(pVnode->qMgmt, qId, pQInfo); - if (handle == NULL) { // failed to register qhandle - pRsp->code = terrno; - terrno = 0; - - vError("vgId:%d, QInfo:0x%" PRIx64 "-%p register qhandle failed, return to app, code:%s,", pVnode->vgId, qId, - (void *)pQInfo, tstrerror(pRsp->code)); - qDestroyQueryInfo(pQInfo); // destroy it directly - return pRsp->code; - } else { - assert(*handle == pQInfo); - pRsp->qId = htobe64(qId); - } - - if (handle != NULL && - vnodeNotifyCurrentQhandle(pRead->rpcHandle, qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - vError("vgId:%d, QInfo:0x%" PRIx64 "-%p, query discarded since link is broken, %p", pVnode->vgId, qId, *handle, - pRead->rpcHandle); - - pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - return pRsp->code; - } - - } else { - assert(pQInfo == NULL); - } - - if (handle != NULL) { - vTrace("vgId:%d, QInfo:0x%" PRIx64 "-%p, query msg disposed, create qhandle and returns to app", vgId, qId, - *handle); - code = vnodeReputPutToRQueue(pVnode, handle, pRead->rpcHandle); - if (code != TSDB_CODE_SUCCESS) { - pRsp->code = code; - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - return pRsp->code; - } - } - - int32_t remain = atomic_add_fetch_32(&pVnode->numOfExistQHandle, 1); - vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain); - } else { - assert(pCont != NULL); - void ** qhandle = (void **)pRead->qhandle; - uint64_t qId = 0; - - vTrace("vgId:%d, QInfo:%p, continues to exec query", pVnode->vgId, *qhandle); - - // In the retrieve blocking model, only 50% CPU will be used in query processing - if (tsRetrieveBlockingModel) { - qTableQuery(*qhandle, &qId); // do execute query - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, false); - } else { - bool freehandle = false; - bool buildRes = qTableQuery(*qhandle, &qId); // do execute query - - // build query rsp, the retrieve request has reached here already - if (buildRes) { - // update the connection info according to the retrieve connection - pRead->rpcHandle = qGetResultRetrieveMsg(*qhandle); - assert(pRead->rpcHandle != NULL); - - vTrace("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *qhandle, - pRead->rpcHandle); - - // set the real rsp error code - pRead->code = vnodeDumpQueryResult(&pRead->rspRet, pVnode, qId, qhandle, &freehandle, pRead->rpcHandle); - - // NOTE: set return code to be TSDB_CODE_QRY_HAS_RSP to notify dnode to return msg to client - code = TSDB_CODE_QRY_HAS_RSP; - } else { - // void *h1 = qGetResultRetrieveMsg(*qhandle); - - /* remove this assert, one possible case that will cause h1 not NULL: query thread unlock pQInfo->lock, and then - * FETCH thread execute twice before query thread reach here */ - // assert(h1 == NULL); - - freehandle = qQueryCompleted(*qhandle); - } - - // NOTE: if the qhandle is not put into vread queue or query is completed, free the qhandle. - // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle - if (freehandle || (!buildRes)) { - if (freehandle) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); - vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain); - } - - qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, freehandle); - } - } - } - - return code; -#endif - return 0; -} - -//mq related -<<<<<<< HEAD -int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { - //parse message and optionally move offset - void* pMsg = pRead->pCont; - tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; - tmqMsgHead msgHead = pConsumeMsg->head; - //extract head - STQ *pTq = pVnode->pTQ; - tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); - //return msg if offset not moved - if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { - //return msg - return 0; - } - //or move offset - tqMoveOffsetToNext(pHandle); - //fetch or register context - tqFetchMsg(pHandle, pRead); - //judge mode, tail read or catch up read - //launch new query - return 0; -} - -int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { - //get operator tree from tq data structure - //execute operator tree - //put data into ringbuffer - //unref memory -======= -int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ - return 0; -} -int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { ->>>>>>> 3.0 - return 0; -} -//mq related end - -int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { -#if 0 - void * pCont = pRead->pCont; - SVnRsp *pRet = &pRead->rspRet; - - SRetrieveTableMsg *pRetrieve = pCont; - pRetrieve->free = htons(pRetrieve->free); - pRetrieve->qId = htobe64(pRetrieve->qId); - - vTrace("vgId:%d, qId:0x%" PRIx64 ", retrieve msg is disposed, free:%d, conn:%p", pVnode->vgId, pRetrieve->qId, - pRetrieve->free, pRead->rpcHandle); - - memset(pRet, 0, sizeof(SVnRsp)); - - terrno = TSDB_CODE_SUCCESS; - int32_t code = TSDB_CODE_SUCCESS; - void ** handle = qAcquireQInfo(pVnode->qMgmt, pRetrieve->qId); - if (handle == NULL) { - code = terrno; - terrno = TSDB_CODE_SUCCESS; - } else if (!checkQIdEqual(*handle, pRetrieve->qId)) { - code = TSDB_CODE_QRY_INVALID_QHANDLE; - } - - if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, invalid qId in retrieving result, code:%s, QInfo:%" PRIu64, pVnode->vgId, tstrerror(code), - pRetrieve->qId); - vnodeBuildNoResultQueryRsp(pRet); - return code; - } - - // kill current query and free corresponding resources. - if (pRetrieve->free == 1) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); - vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d", - pVnode->vgId, pRetrieve->qId, *handle, remain); - - qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - - vnodeBuildNoResultQueryRsp(pRet); - code = TSDB_CODE_TSC_QUERY_CANCELLED; - return code; - } - - // register the qhandle to connect to quit query immediate if connection is broken - if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); - vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d", - pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain); - - code = TSDB_CODE_RPC_NETWORK_UNAVAIL; - qKillQuery(*handle); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - return code; - } - - bool freeHandle = true; - bool buildRes = false; - - code = qRetrieveQueryResultInfo(*handle, &buildRes, pRead->rpcHandle); - if (code != TSDB_CODE_SUCCESS) { - // TODO handle malloc failure - pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp)); - pRet->len = sizeof(SRetrieveTableRsp); - memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp)); - freeHandle = true; - } else { // result is not ready, return immediately - // Only affects the non-blocking model - if (!tsRetrieveBlockingModel) { - if (!buildRes) { - assert(pRead->rpcHandle != NULL); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, false); - return TSDB_CODE_QRY_NOT_READY; - } - } - - // ahandle is the sqlObj pointer - code = vnodeDumpQueryResult(pRet, pVnode, pRetrieve->qId, handle, &freeHandle, pRead->rpcHandle); - } - - // If qhandle is not added into vread queue, the query should be completed already or paused with error. - // Here free qhandle immediately - if (freeHandle) { - int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); - vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain); - qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); - } - - return code; -#endif - return 0; -} - From e136ae1ba061b195c906d3335a83699b7313af5e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 15:07:17 +0800 Subject: [PATCH 08/11] ignore orig file --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 83ed62c030..8df75abe20 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ mac/ .mypy_cache *.tmp *.swp +*.orig src/connector/nodejs/node_modules/ src/connector/nodejs/out/ tests/test/ From 108c14aa79357a678c864c69c86b842d5a0d542a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 15:13:38 +0800 Subject: [PATCH 09/11] add function --- include/libs/wal/wal.h | 4 ++-- source/server/vnode/tq/src/tq.c | 16 +++++++++++++++- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b6fd5a70d9..9a3310922d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -58,9 +58,9 @@ void walStop(twalh); void walClose(twalh); //write -int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); +//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); int64_t walWrite(twalh, void* body, int32_t bodyLen); -int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize); +int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize); //apis for lifecycle management void walFsync(twalh, bool force); diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index b2bfbced37..e0f2fc545e 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -59,13 +59,27 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { /*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ /*return tqCommitTCGroup(handle);*/ -} +/*}*/ int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { //delete from disk return 0; } + +int tqFetchMsg(tqGroupHandle* handle, void* msg) { + return 0; +} + +int tqMoveOffsetToNext(tqGroupHandle* handle) { + return 0; +} + + +tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) { + return NULL; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference //judge and launch new query From 9c2b0c0217574627a6bb278c456a5768ac2e18e5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 26 Oct 2021 10:05:50 +0800 Subject: [PATCH 10/11] add function --- source/server/dnode/src/dnodeTrans.c | 8 ++++++-- source/server/vnode/src/vnodeMain.c | 6 ++---- source/server/vnode/src/vnodeReadMsg.c | 1 + 3 files changed, 9 insertions(+), 6 deletions(-) diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index abb339fc2c..4d5c9e66d8 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -98,8 +98,12 @@ static int32_t dnodeInitServer() { tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_GRANT] = mnodeProcessMsg; tsTrans.peerMsgFp[TSDB_MSG_TYPE_DM_STATUS] = mnodeProcessMsg; - tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; - /*tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessRead;*/ + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessMsg; + tsTrans.peerMsgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessMsg; SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); diff --git a/source/server/vnode/src/vnodeMain.c b/source/server/vnode/src/vnodeMain.c index da1c1d7235..f1834ffdbe 100644 --- a/source/server/vnode/src/vnodeMain.c +++ b/source/server/vnode/src/vnodeMain.c @@ -797,13 +797,11 @@ static void vnodeInitMsgFp() { tsVmain.msgFp[TSDB_MSG_TYPE_MQ_DISCONNECT] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_ACK] = vnodeProcessWriteMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_RESET] = vnodeProcessWriteMsg; - //mq related end - tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; - tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; - //mq related tsVmain.msgFp[TSDB_MSG_TYPE_MQ_QUERY] = vnodeProcessReadMsg; tsVmain.msgFp[TSDB_MSG_TYPE_MQ_CONSUME] = vnodeProcessReadMsg; //mq related end + tsVmain.msgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessReadMsg; + tsVmain.msgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessReadMsg; } void vnodeProcessMsg(SRpcMsg *pMsg) { diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index b4070546c7..d4701ec4ce 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -236,6 +236,7 @@ int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //fetch or register context tqFetchMsg(pHandle, pRead); //judge mode, tail read or catch up read + /*int64_t lastVer = walLastVer(pVnode->wal);*/ //launch new query return 0; } From fe2baf256c137c5621f934ca2d33e902828a3c17 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 26 Oct 2021 10:12:13 +0800 Subject: [PATCH 11/11] fix compile error --- source/server/dnode/src/dnodeTrans.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 6f4cb2d6eb..4eba31ab68 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -21,9 +21,8 @@ #define _DEFAULT_SOURCE #include "dnodeTrans.h" -#include "dnodeMain.h" -#include "dnodeMnodeEps.h" -#include "dnodeStatus.h" +#include "dnodeEps.h" +#include "dnodeMsg.h" #include "mnode.h" #include "vnode.h" @@ -143,7 +142,7 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { } if (msgType == TSDB_MSG_TYPE_DM_STATUS_RSP && pEpSet) { - dnodeUpdateMnodeFromPeer(pEpSet); + dnodeUpdateMnodeEps(pEpSet); } RpcMsgFp fp = tsTrans.peerMsgFp[msgType];