diff --git a/.gitignore b/.gitignore index 83ed62c030..8df75abe20 100644 --- a/.gitignore +++ b/.gitignore @@ -18,6 +18,7 @@ mac/ .mypy_cache *.tmp *.swp +*.orig src/connector/nodejs/node_modules/ src/connector/nodejs/out/ tests/test/ diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index b6fd5a70d9..9a3310922d 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -58,9 +58,9 @@ void walStop(twalh); void walClose(twalh); //write -int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); +//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen); int64_t walWrite(twalh, void* body, int32_t bodyLen); -int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize); +int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize); //apis for lifecycle management void walFsync(twalh, bool force); diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 6e56e8256f..f30ba75c42 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -24,9 +24,10 @@ extern "C" { typedef struct tmqMsgHead { int32_t headLen; - int32_t msgVer; + int32_t protoVer; int64_t cgId; int64_t topicId; + int64_t clientId; int32_t checksum; int32_t msgType; } tmqMsgHead; @@ -34,35 +35,43 @@ typedef struct tmqMsgHead { //TODO: put msgs into common typedef struct tmqConnectReq { tmqMsgHead head; - } tmqConnectReq; typedef struct tmqConnectResp { - + tmqMsgHead head; + int8_t status; } tmqConnectResp; typedef struct tmqDisconnectReq { - + tmqMsgHead head; } tmqDisconnectReq; typedef struct tmqDisconnectResp { - + tmqMsgHead head; + int8_t status; } tmqDiconnectResp; typedef struct tmqConsumeReq { - + tmqMsgHead head; + int64_t commitOffset; } tmqConsumeReq; typedef struct tmqConsumeResp { - + tmqMsgHead head; + char content[]; } tmqConsumeResp; -typedef struct tmqSubscribeReq { - +// +typedef struct tmqMnodeSubscribeReq { + tmqMsgHead head; + int64_t topicLen; + char topic[]; } tmqSubscribeReq; -typedef struct tmqSubscribeResp { - +typedef struct tmqMnodeSubscribeResp { + tmqMsgHead head; + int64_t vgId; + char ep[]; //TSDB_EP_LEN } tmqSubscribeResp; typedef struct tmqHeartbeatReq { @@ -92,6 +101,24 @@ typedef struct STQ { //value=consumeOffset: int64_t } STQ; +#define TQ_BUFFER_SIZE 8 + +typedef struct tqBufferItem { + int64_t offset; + void* executor; + void* content; +} tqBufferItem; + +typedef struct tqGroupHandle { + char* topic; //c style, end with '\0' + int64_t cgId; + void* ahandle; + int64_t consumeOffset; + int32_t head; + int32_t tail; + tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqGroupHandle; + //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); @@ -103,6 +130,17 @@ int tqCommit(STQ*); //void* will be replace by a msg type int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId); + +int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +int tqMoveOffsetToNext(tqGroupHandle*); +int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); +int tqFetchMsg(tqGroupHandle*, void*); +int tqRegisterContext(tqGroupHandle*, void*); +int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); + #ifdef __cplusplus } #endif diff --git a/source/server/dnode/src/dnodeTrans.c b/source/server/dnode/src/dnodeTrans.c index 20601f23e7..6d734932ff 100644 --- a/source/server/dnode/src/dnodeTrans.c +++ b/source/server/dnode/src/dnodeTrans.c @@ -25,6 +25,7 @@ #include "dnodeMsg.h" #include "mnode.h" #include "vnode.h" +#include "mnode.h" typedef void (*RpcMsgFp)(SRpcMsg *pMsg); @@ -143,7 +144,6 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) { RpcMsgFp fp = tsTrans.peerMsgFp[msgType]; if (fp != NULL) { - dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]); (*fp)(pMsg); } else { dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]); diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 158e550dcf..b4070546c7 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -218,10 +218,33 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { } //mq related -int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){ +int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { + //parse message and optionally move offset + void* pMsg = pRead->pCont; + tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; + tmqMsgHead msgHead = pConsumeMsg->head; + //extract head + STQ *pTq = pVnode->pTQ; + tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId); + //return msg if offset not moved + if(pConsumeMsg->commitOffset == pHandle->consumeOffset) { + //return msg + return 0; + } + //or move offset + tqMoveOffsetToNext(pHandle); + //fetch or register context + tqFetchMsg(pHandle, pRead); + //judge mode, tail read or catch up read + //launch new query return 0; } + int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) { + //get operator tree from tq data structure + //execute operator tree + //put data into ringbuffer + //unref memory return 0; } //mq related end diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index cba9075fe9..0896e7afab 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -18,37 +18,19 @@ #include "tq.h" -#define TQ_BUFFER_SIZE 8 #ifdef __cplusplus extern "C" { #endif -typedef struct tqBufferItem { - int64_t offset; - void* executor; - void* content; -} tqBufferItem; - - -typedef struct tqGroupHandle { - char* topic; //c style, end with '\0' - int64_t cgId; - void* ahandle; - int64_t consumeOffset; - int32_t head; - int32_t tail; - tqBufferItem buffer[TQ_BUFFER_SIZE]; -} tqGroupHandle; - //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); //create ring buffer in memory and load consuming offset -int tqOpenTCGroup(STQ*, const char* topic, int cgId); +//int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset -int tqCloseTCGroup(STQ*, const char* topic, int cgId); +//int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info int tqDropTCGroup(STQ*, const char* topic, int cgId); diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 7733ac29b5..e0f2fc545e 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,7 +22,7 @@ // //handle management message -static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { +tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { //look in memory // //not found, try to restore from disk @@ -56,16 +56,30 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { return 0; } -int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) { - tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); - return tqCommitTCGroup(handle); -} +/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ + /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ + /*return tqCommitTCGroup(handle);*/ +/*}*/ int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { //delete from disk return 0; } + +int tqFetchMsg(tqGroupHandle* handle, void* msg) { + return 0; +} + +int tqMoveOffsetToNext(tqGroupHandle* handle) { + return 0; +} + + +tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) { + return NULL; +} + int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference //judge and launch new query