From 0acfddf32cfa5ee87f807653905f761f044d5e1e Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 25 Oct 2021 15:04:38 +0800 Subject: [PATCH] merge tq related (#8415) add tq data structure --- include/server/vnode/tq/tq.h | 60 +++++++++++++++++++++----- source/server/dnode/src/dnodeTrans.c | 2 +- source/server/vnode/src/vnodeReadMsg.c | 25 ++++++++++- source/server/vnode/tq/inc/tqInt.h | 22 +--------- source/server/vnode/tq/src/tq.c | 8 ++-- 5 files changed, 80 insertions(+), 37 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 6e56e8256f..f30ba75c42 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -24,9 +24,10 @@ extern "C" { typedef struct tmqMsgHead { int32_t headLen; - int32_t msgVer; + int32_t protoVer; int64_t cgId; int64_t topicId; + int64_t clientId; int32_t checksum; int32_t msgType; } tmqMsgHead; @@ -34,35 +35,43 @@ typedef struct tmqMsgHead { //TODO: put msgs into common typedef struct tmqConnectReq { tmqMsgHead head; - } tmqConnectReq; typedef struct tmqConnectResp { - + tmqMsgHead head; + int8_t status; } tmqConnectResp; typedef struct tmqDisconnectReq { - + tmqMsgHead head; } tmqDisconnectReq; typedef struct tmqDisconnectResp { - + tmqMsgHead head; + int8_t status; } tmqDiconnectResp; typedef struct tmqConsumeReq { - + tmqMsgHead head; + int64_t commitOffset; } tmqConsumeReq; typedef struct tmqConsumeResp { - + tmqMsgHead head; + char content[]; } tmqConsumeResp; -typedef struct tmqSubscribeReq { - +// +typedef struct tmqMnodeSubscribeReq { + tmqMsgHead head; + int64_t topicLen; + char topic[]; } tmqSubscribeReq; -typedef struct tmqSubscribeResp { - +typedef struct tmqMnodeSubscribeResp { + tmqMsgHead head; + int64_t vgId; + char ep[]; //TSDB_EP_LEN } tmqSubscribeResp; typedef struct tmqHeartbeatReq { @@ -92,6 +101,24 @@ typedef struct STQ { //value=consumeOffset: int64_t } STQ; +#define TQ_BUFFER_SIZE 8 + +typedef struct tqBufferItem { + int64_t offset; + void* executor; + void* content; +} tqBufferItem; + +typedef struct tqGroupHandle { + char* topic; //c style, end with '\0' + int64_t cgId; + void* ahandle; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; + //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); @@ -103,6 +130,17 @@ int tqCommit(STQ*); //void* will be replace by a msg type int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId); + +int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqMoveOffsetToNext(tqGroupHandle*); +int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); +int tqFetchMsg(tqGroupHandle*, void*); +int tqRegisterContext(tqGroupHandle*, void*); +int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); + #ifdef __cplusplus } #endif diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 7b48bea622..abb339fc2c 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -26,6 +26,7 @@ #include "dnodeStatus.h" #include "mnode.h" #include "vnode.h" +#include "mnode.h" typedef void (*RpcMsgFp)(SRpcMsg *pMsg); @@ -144,7 +145,6 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 158e550dcf..b4070546c7 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -218,10 +218,33 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { } //mq related -int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { + //parse message and optionally move offset + void* pMsg = pRead->pCont; + tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; + tmqMsgHead msgHead = pConsumeMsg->head; + //extract head + STQ *pTq = pVnode->pTQ; + tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); + //return msg if offset not moved + if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { + //return msg + return 0; + } + //or move offset + tqMoveOffsetToNext(pHandle); + //fetch or register context + tqFetchMsg(pHandle, pRead); + //judge mode, tail read or catch up read + //launch new query return 0; } + int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + //get operator tree from tq data structure + //execute operator tree + //put data into ringbuffer + //unref memory return 0; } //mq related end diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index cba9075fe9..0896e7afab 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,37 +18,19 @@ #include "tq.h" -#define TQ_BUFFER_SIZE 8 #ifdef __cplusplus extern "C" { #endif -typedef struct tqBufferItem { - int64_t offset; - void* executor; - void* content; -} tqBufferItem; - - -typedef struct tqGroupHandle { - char* topic; //c style, end with '\0' - int64_t cgId; - void* ahandle; - int64_t consumeOffset; - int32_t head; - int32_t tail; - tqBufferItem buffer[TQ_BUFFER_SIZE]; -} tqGroupHandle; - //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenTCGroup(STQ*, const char* topic, int cgId); +//int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseTCGroup(STQ*, const char* topic, int cgId); +//int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info int tqDropTCGroup(STQ*, const char* topic, int cgId); diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 7733ac29b5..b2bfbced37 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,7 +22,7 @@ // //handle management message -static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { +tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { //look in memory // //not found, try to restore from disk @@ -56,9 +56,9 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { return 0; } -int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { - tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); - return tqCommitTCGroup(handle); +/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ + /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ + /*return tqCommitTCGroup(handle);*/ } int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {