Merge remote-tracking branch 'origin/3.0' into feature/dnode3
This commit is contained in:
commit
bb126aaa21
|
@ -18,6 +18,7 @@ mac/
|
|||
.mypy_cache
|
||||
*.tmp
|
||||
*.swp
|
||||
*.orig
|
||||
src/connector/nodejs/node_modules/
|
||||
src/connector/nodejs/out/
|
||||
tests/test/
|
||||
|
|
|
@ -58,9 +58,9 @@ void walStop(twalh);
|
|||
void walClose(twalh);
|
||||
|
||||
//write
|
||||
int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
|
||||
//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
|
||||
int64_t walWrite(twalh, void* body, int32_t bodyLen);
|
||||
int64_t walWriteBatch(twalh, void* body, int32_t* bodyLen, int32_t batchSize);
|
||||
int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize);
|
||||
|
||||
//apis for lifecycle management
|
||||
void walFsync(twalh, bool force);
|
||||
|
|
|
@ -24,9 +24,10 @@ extern "C" {
|
|||
|
||||
typedef struct tmqMsgHead {
|
||||
int32_t headLen;
|
||||
int32_t msgVer;
|
||||
int32_t protoVer;
|
||||
int64_t cgId;
|
||||
int64_t topicId;
|
||||
int64_t clientId;
|
||||
int32_t checksum;
|
||||
int32_t msgType;
|
||||
} tmqMsgHead;
|
||||
|
@ -34,35 +35,43 @@ typedef struct tmqMsgHead {
|
|||
//TODO: put msgs into common
|
||||
typedef struct tmqConnectReq {
|
||||
tmqMsgHead head;
|
||||
|
||||
} tmqConnectReq;
|
||||
|
||||
typedef struct tmqConnectResp {
|
||||
|
||||
tmqMsgHead head;
|
||||
int8_t status;
|
||||
} tmqConnectResp;
|
||||
|
||||
typedef struct tmqDisconnectReq {
|
||||
|
||||
tmqMsgHead head;
|
||||
} tmqDisconnectReq;
|
||||
|
||||
typedef struct tmqDisconnectResp {
|
||||
|
||||
tmqMsgHead head;
|
||||
int8_t status;
|
||||
} tmqDiconnectResp;
|
||||
|
||||
typedef struct tmqConsumeReq {
|
||||
|
||||
tmqMsgHead head;
|
||||
int64_t commitOffset;
|
||||
} tmqConsumeReq;
|
||||
|
||||
typedef struct tmqConsumeResp {
|
||||
|
||||
tmqMsgHead head;
|
||||
char content[];
|
||||
} tmqConsumeResp;
|
||||
|
||||
typedef struct tmqSubscribeReq {
|
||||
|
||||
//
|
||||
typedef struct tmqMnodeSubscribeReq {
|
||||
tmqMsgHead head;
|
||||
int64_t topicLen;
|
||||
char topic[];
|
||||
} tmqSubscribeReq;
|
||||
|
||||
typedef struct tmqSubscribeResp {
|
||||
|
||||
typedef struct tmqMnodeSubscribeResp {
|
||||
tmqMsgHead head;
|
||||
int64_t vgId;
|
||||
char ep[]; //TSDB_EP_LEN
|
||||
} tmqSubscribeResp;
|
||||
|
||||
typedef struct tmqHeartbeatReq {
|
||||
|
@ -92,6 +101,24 @@ typedef struct STQ {
|
|||
//value=consumeOffset: int64_t
|
||||
} STQ;
|
||||
|
||||
#define TQ_BUFFER_SIZE 8
|
||||
|
||||
typedef struct tqBufferItem {
|
||||
int64_t offset;
|
||||
void* executor;
|
||||
void* content;
|
||||
} tqBufferItem;
|
||||
|
||||
typedef struct tqGroupHandle {
|
||||
char* topic; //c style, end with '\0'
|
||||
int64_t cgId;
|
||||
void* ahandle;
|
||||
int64_t consumeOffset;
|
||||
int32_t head;
|
||||
int32_t tail;
|
||||
tqBufferItem buffer[TQ_BUFFER_SIZE];
|
||||
} tqGroupHandle;
|
||||
|
||||
//init in each vnode
|
||||
STQ* tqInit(void* ref_func(void*), void* unref_func(void*));
|
||||
void tqCleanUp(STQ*);
|
||||
|
@ -103,6 +130,17 @@ int tqCommit(STQ*);
|
|||
//void* will be replace by a msg type
|
||||
int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg);
|
||||
|
||||
tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId);
|
||||
|
||||
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||
int tqMoveOffsetToNext(tqGroupHandle*);
|
||||
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
|
||||
int tqFetchMsg(tqGroupHandle*, void*);
|
||||
int tqRegisterContext(tqGroupHandle*, void*);
|
||||
int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
|
||||
int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "dnodeMsg.h"
|
||||
#include "mnode.h"
|
||||
#include "vnode.h"
|
||||
#include "mnode.h"
|
||||
|
||||
typedef void (*RpcMsgFp)(SRpcMsg *pMsg);
|
||||
|
||||
|
@ -143,7 +144,6 @@ static void dnodeProcessPeerRsp(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
|
|||
|
||||
RpcMsgFp fp = tsTrans.peerMsgFp[msgType];
|
||||
if (fp != NULL) {
|
||||
dTrace("RPC %p, peer rsp:%s will be processed", pMsg->handle, taosMsg[msgType]);
|
||||
(*fp)(pMsg);
|
||||
} else {
|
||||
dDebug("RPC %p, peer rsp:%s not processed", pMsg->handle, taosMsg[msgType]);
|
||||
|
|
|
@ -218,10 +218,33 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
|
|||
}
|
||||
|
||||
//mq related
|
||||
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead){
|
||||
int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) {
|
||||
//parse message and optionally move offset
|
||||
void* pMsg = pRead->pCont;
|
||||
tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg;
|
||||
tmqMsgHead msgHead = pConsumeMsg->head;
|
||||
//extract head
|
||||
STQ *pTq = pVnode->pTQ;
|
||||
tqGroupHandle *pHandle = tqFindGHandleBycId(pTq, msgHead.clientId);
|
||||
//return msg if offset not moved
|
||||
if(pConsumeMsg->commitOffset == pHandle->consumeOffset) {
|
||||
//return msg
|
||||
return 0;
|
||||
}
|
||||
//or move offset
|
||||
tqMoveOffsetToNext(pHandle);
|
||||
//fetch or register context
|
||||
tqFetchMsg(pHandle, pRead);
|
||||
//judge mode, tail read or catch up read
|
||||
//launch new query
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessTqQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
|
||||
//get operator tree from tq data structure
|
||||
//execute operator tree
|
||||
//put data into ringbuffer
|
||||
//unref memory
|
||||
return 0;
|
||||
}
|
||||
//mq related end
|
||||
|
|
|
@ -18,37 +18,19 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
#define TQ_BUFFER_SIZE 8
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct tqBufferItem {
|
||||
int64_t offset;
|
||||
void* executor;
|
||||
void* content;
|
||||
} tqBufferItem;
|
||||
|
||||
|
||||
typedef struct tqGroupHandle {
|
||||
char* topic; //c style, end with '\0'
|
||||
int64_t cgId;
|
||||
void* ahandle;
|
||||
int64_t consumeOffset;
|
||||
int32_t head;
|
||||
int32_t tail;
|
||||
tqBufferItem buffer[TQ_BUFFER_SIZE];
|
||||
} tqGroupHandle;
|
||||
|
||||
//create persistent storage for meta info such as consuming offset
|
||||
//return value > 0: cgId
|
||||
//return value <= 0: error code
|
||||
int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle);
|
||||
//create ring buffer in memory and load consuming offset
|
||||
int tqOpenTCGroup(STQ*, const char* topic, int cgId);
|
||||
//int tqOpenTCGroup(STQ*, const char* topic, int cgId);
|
||||
//destroy ring buffer and persist consuming offset
|
||||
int tqCloseTCGroup(STQ*, const char* topic, int cgId);
|
||||
//int tqCloseTCGroup(STQ*, const char* topic, int cgId);
|
||||
//delete persistent storage for meta info
|
||||
int tqDropTCGroup(STQ*, const char* topic, int cgId);
|
||||
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
//
|
||||
//handle management message
|
||||
|
||||
static tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
|
||||
tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) {
|
||||
//look in memory
|
||||
//
|
||||
//not found, try to restore from disk
|
||||
|
@ -56,16 +56,30 @@ int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {
|
||||
tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);
|
||||
return tqCommitTCGroup(handle);
|
||||
}
|
||||
/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/
|
||||
/*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/
|
||||
/*return tqCommitTCGroup(handle);*/
|
||||
/*}*/
|
||||
|
||||
int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) {
|
||||
//delete from disk
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int tqFetchMsg(tqGroupHandle* handle, void* msg) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tqMoveOffsetToNext(tqGroupHandle* handle) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int tqPushMsg(STQ* pTq , void* p, int64_t version) {
|
||||
//add reference
|
||||
//judge and launch new query
|
||||
|
|
Loading…
Reference in New Issue