From 825239081f113fc5ad4bf55e88a43c8fe668bcd9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 28 Oct 2021 14:48:12 +0800 Subject: [PATCH] refact: change struct name --- include/server/vnode/tq/tq.h | 150 ++++++++++++------------- source/server/vnode/src/vnodeReadMsg.c | 4 +- source/server/vnode/tq/src/tq.c | 68 +++++------ 3 files changed, 111 insertions(+), 111 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index 1e219e0983..495383684a 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -22,89 +22,89 @@ extern "C" { #endif -typedef struct tmqMsgHead { +typedef struct TmqMsgHead { int32_t protoVer; int32_t msgType; int64_t cgId; int64_t clientId; -} tmqMsgHead; +} TmqMsgHead; -typedef struct tmqOneAck { +typedef struct TmqOneAck { int64_t topicId; int64_t consumeOffset; -} tmqOneAck; +} TmqOneAck; -typedef struct tmqAcks { +typedef struct TmqAcks { int32_t ackNum; //should be sorted - tmqOneAck acks[]; -} tmqAcks; + TmqOneAck acks[]; +} TmqAcks; //TODO: put msgs into common -typedef struct tmqConnectReq { - tmqMsgHead head; - tmqAcks acks; -} tmqConnectReq; +typedef struct TmqConnectReq { + TmqMsgHead head; + TmqAcks acks; +} TmqConnectReq; -typedef struct tmqConnectRsp { - tmqMsgHead head; +typedef struct TmqConnectRsp { + TmqMsgHead head; int8_t status; -} tmqConnectRsp; +} TmqConnectRsp; -typedef struct tmqDisconnectReq { - tmqMsgHead head; -} tmqDisconnectReq; +typedef struct TmqDisconnectReq { + TmqMsgHead head; +} TmqDiscconectReq; -typedef struct tmqDisconnectRsp { - tmqMsgHead head; +typedef struct TmqDisconnectRsp { + TmqMsgHead head; int8_t status; -} tmqDiconnectRsp; +} TmqDisconnectRsp; -typedef struct tmqConsumeReq { - tmqMsgHead head; - tmqAcks acks; -} tmqConsumeReq; +typedef struct TmqConsumeReq { + TmqMsgHead head; + TmqAcks acks; +} TmqConsumeReq; -typedef struct tmqMsgContent { +typedef struct TmqMsgContent { int64_t topicId; int64_t msgLen; char msg[]; -} tmqMsgContent; +} TmqMsgContent; -typedef struct tmqConsumeRsp { - tmqMsgHead head; +typedef struct TmqConsumeRsp { + TmqMsgHead head; int64_t bodySize; - tmqMsgContent msgs[]; -} tmqConsumeRsp; + TmqMsgContent msgs[]; +} TmqConsumeRsp; -typedef struct tmqMnodeSubscribeReq { - tmqMsgHead head; +typedef struct TmqSubscribeReq { + TmqMsgHead head; int64_t topicLen; char topic[]; -} tmqSubscribeReq; +} TmqSubscribeReq; -typedef struct tmqMnodeSubscribeRsp { - tmqMsgHead head; +typedef struct tmqSubscribeRsp { + TmqMsgHead head; int64_t vgId; char ep[]; //TSDB_EP_LEN -} tmqSubscribeRsp; +} TmqSubscribeRsp; -typedef struct tmqHeartbeatReq { +typedef struct TmqHeartbeatReq { -} tmqHeartbeatReq; +} TmqHeartbeatReq; -typedef struct tmqHeartbeatRsp { +typedef struct TmqHeartbeatRsp { -} tmqHeartbeatRsp; +} TmqHeartbeatRsp; -typedef struct tqTopicVhandle { +typedef struct TqTopicVhandle { //name // //executor for filter // //callback for mnode // -} tqTopicVhandle; +} TqTopicVhandle; typedef struct STQ { //the collection of group handle @@ -114,16 +114,16 @@ typedef struct STQ { #define TQ_BUFFER_SIZE 8 //TODO: define a serializer and deserializer -typedef struct tqBufferItem { +typedef struct TqBufferItem { int64_t offset; //executors are identical but not concurrent //so it must be a copy in each item void* executor; int64_t size; void* content; -} tqBufferItem; +} TqBufferItem; -typedef struct tqBufferHandle { +typedef struct TqBufferHandle { //char* topic; //c style, end with '\0' //int64_t cgId; //void* ahandle; @@ -131,32 +131,32 @@ typedef struct tqBufferHandle { int64_t topicId; int32_t head; int32_t tail; - tqBufferItem buffer[TQ_BUFFER_SIZE]; -} tqBufferHandle; + TqBufferItem buffer[TQ_BUFFER_SIZE]; +} TqBufferHandle; -typedef struct tqListHandle { - tqBufferHandle bufHandle; - struct tqListHandle* next; -} tqListHandle; +typedef struct TqListHandle { + TqBufferHandle bufHandle; + struct TqListHandle* next; +} TqListHandle; -typedef struct tqGroupHandle { +typedef struct TqGroupHandle { int64_t cId; int64_t cgId; void* ahandle; int32_t topicNum; - tqListHandle *head; -} tqGroupHandle; + TqListHandle *head; +} TqGroupHandle; -typedef struct tqQueryExec { +typedef struct TqQueryExec { void* src; - tqBufferItem* dest; + TqBufferItem* dest; void* executor; -} tqQueryExec; +} TqQueryExec; -typedef struct tqQueryMsg { - tqQueryExec *exec; - struct tqQueryMsg *next; -} tqQueryMsg; +typedef struct TqQueryMsg { + TqQueryExec *exec; + struct TqQueryMsg *next; +} TqQueryMsg; //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); @@ -166,28 +166,28 @@ void tqCleanUp(STQ*); int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqConsume(STQ*, tmqConsumeReq*); +int tqConsume(STQ*, TmqConsumeReq*); -tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); +TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); -int tqMoveOffsetToNext(tqGroupHandle*); +int tqMoveOffsetToNext(TqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqRegisterContext(tqGroupHandle*, void*); -int tqLaunchQuery(tqGroupHandle*); -int tqSendLaunchQuery(tqGroupHandle*); +int tqRegisterContext(TqGroupHandle*, void* ahandle); +int tqLaunchQuery(TqGroupHandle*); +int tqSendLaunchQuery(TqGroupHandle*); -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes); -void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr); -void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr); -void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr); +int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes); +void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr); +void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr); +void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr); -const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle); -const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem); +const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle); +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem); -int tqGetGHandleSSize(const tqGroupHandle *gHandle); +int tqGetGHandleSSize(const TqGroupHandle *gHandle); int tqBufHandleSSize(); int tqBufItemSSize(); diff --git a/source/server/vnode/src/vnodeReadMsg.c b/source/server/vnode/src/vnodeReadMsg.c index 1835b4f558..3626626791 100644 --- a/source/server/vnode/src/vnodeReadMsg.c +++ b/source/server/vnode/src/vnodeReadMsg.c @@ -221,8 +221,8 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { int32_t vnodeProcessConsumeMsg(SVnode *pVnode, SReadMsg *pRead) { //parse message and optionally move offset void* pMsg = pRead->pCont; - tmqConsumeReq *pConsumeMsg = (tmqConsumeReq*) pMsg; - tmqMsgHead msgHead = pConsumeMsg->head; + TmqConsumeReq *pConsumeMsg = (TmqConsumeReq*) pMsg; + TmqMsgHead msgHead = pConsumeMsg->head; //extract head STQ *pTq = pVnode->pTQ; /*tqBufferHandle *pHandle = tqGetHandle(pTq, msgHead.clientId);*/ diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index fc15a676d8..7ecdfe7f19 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -22,18 +22,18 @@ // //handle management message // -static int tqProtoCheck(tmqMsgHead *pMsg) { +static int tqProtoCheck(TmqMsgHead *pMsg) { return pMsg->protoVer == 0; } -static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) { +static int tqAckOneTopic(TqBufferHandle *bhandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) { //clean old item and move forward int32_t consumeOffset = pAck->consumeOffset; int idx = consumeOffset % TQ_BUFFER_SIZE; ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor); tfree(bhandle->buffer[idx].content); if( 1 /* TODO: need to launch new query */) { - tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg)); + TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg)); if(pNewQuery == NULL) { //TODO: memory insufficient return -1; @@ -49,14 +49,14 @@ static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** return 0; } -static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { +static int tqAck(TqGroupHandle* ghandle, TmqAcks* pAcks) { int32_t ackNum = pAcks->ackNum; - tmqOneAck *acks = pAcks->acks; + TmqOneAck *acks = pAcks->acks; //double ptr for acks and list int i = 0; - tqListHandle* node = ghandle->head; + TqListHandle* node = ghandle->head; int ackCnt = 0; - tqQueryMsg *pQuery = NULL; + TqQueryMsg *pQuery = NULL; while(i < ackNum && node->next) { if(acks[i].topicId == node->next->bufHandle.topicId) { ackCnt++; @@ -73,12 +73,12 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { return ackCnt; } -static int tqCommitTCGroup(tqGroupHandle* handle) { +static int tqCommitTCGroup(TqGroupHandle* handle) { //persist modification into disk return 0; } -int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) { +int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) { //create in disk return 0; } @@ -99,13 +99,13 @@ int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; } -static int tqFetch(tqGroupHandle* ghandle, void** msg) { - tqListHandle* head = ghandle->head; - tqListHandle* node = head; +static int tqFetch(TqGroupHandle* ghandle, void** msg) { + TqListHandle* head = ghandle->head; + TqListHandle* node = head; int totSize = 0; //TODO: make it a macro int sizeLimit = 4 * 1024; - tmqMsgContent* buffer = malloc(sizeLimit); + TmqMsgContent* buffer = malloc(sizeLimit); if(buffer == NULL) { //TODO:memory insufficient return -1; @@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { //until all topic iterated or msgs over sizeLimit while(node->next) { node = node->next; - tqBufferHandle* bufHandle = &node->bufHandle; + TqBufferHandle* bufHandle = &node->bufHandle; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; if(bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset @@ -144,19 +144,19 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { } -tqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { +TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; } -int tqLaunchQuery(tqGroupHandle* ghandle) { +int tqLaunchQuery(TqGroupHandle* ghandle) { return 0; } -int tqSendLaunchQuery(tqGroupHandle* gHandle) { +int tqSendLaunchQuery(TqGroupHandle* gHandle) { return 0; } -/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ +/*int tqMoveOffsetToNext(TqGroupHandle* ghandle) {*/ /*return 0;*/ /*}*/ @@ -171,13 +171,13 @@ int tqCommit(STQ* pTq) { return 0; } -int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { - if(!tqProtoCheck((tmqMsgHead *)pMsg)) { +int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { + if(!tqProtoCheck((TmqMsgHead *)pMsg)) { //proto version invalid return -1; } int64_t clientId = pMsg->head.clientId; - tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); + TqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); if(ghandle == NULL) { //client not connect return -1; @@ -189,7 +189,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { } } - tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; + TmqConsumeRsp *pRsp = (TmqConsumeRsp*) pMsg; if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) { //fetch error @@ -204,7 +204,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { return 0; } -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { +int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) { //calculate size int sz = tqGetGHandleSSize(gHandle); void* ptr = realloc(*ppBytes, sz); @@ -227,8 +227,8 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { return 0; } -void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { - tqListHandle *node = listHandle; +void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) { + TqListHandle *node = listHandle; ASSERT(node != NULL); while(node) { ptr = tqSerializeBufHandle(&node->bufHandle, ptr); @@ -237,7 +237,7 @@ void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { return ptr; } -void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { +void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) { *(int64_t*)ptr = bufHandle->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = bufHandle->topicId; @@ -252,13 +252,13 @@ void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { return ptr; } -void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) { +void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) { //TODO: do we need serialize this? //mainly for executor return ptr; } -const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) { +const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) { const void* ptr = pBytes; gHandle->cId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -268,10 +268,10 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) gHandle->topicNum = *(int32_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); gHandle->head = NULL; - tqListHandle *node = gHandle->head; + TqListHandle *node = gHandle->head; for(int i = 0; i < gHandle->topicNum; i++) { if(gHandle->head == NULL) { - if((node = malloc(sizeof(tqListHandle))) == NULL) { + if((node = malloc(sizeof(TqListHandle))) == NULL) { //TODO: error return NULL; } @@ -279,7 +279,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); gHandle->head = node; } else { - node->next = malloc(sizeof(tqListHandle)); + node->next = malloc(sizeof(TqListHandle)); if(node->next == NULL) { //TODO: error return NULL; @@ -292,7 +292,7 @@ const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) return ptr; } -const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) { +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) { const void* ptr = pBytes; bufHandle->nextConsumeOffset = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -308,12 +308,12 @@ const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle return ptr; } -const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) { +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) { return pBytes; } //TODO: make this a macro -int tqGetGHandleSSize(const tqGroupHandle *gHandle) { +int tqGetGHandleSSize(const TqGroupHandle *gHandle) { return sizeof(int64_t) * 2 + sizeof(int32_t) + gHandle->topicNum * tqBufHandleSSize();