From 5e14c85609bb947434dafa4c4e5cd4bc59dd9f93 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 27 Oct 2021 17:49:40 +0800 Subject: [PATCH] add tqconsume and tqserialize --- include/server/vnode/tq/tq.h | 122 ++++++++++---- source/server/vnode/tq/inc/tqInt.h | 4 +- source/server/vnode/tq/src/tq.c | 252 ++++++++++++++++++++++++----- 3 files changed, 300 insertions(+), 78 deletions(-) diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index f30ba75c42..c90991ae10 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -23,64 +23,79 @@ extern "C" { #endif typedef struct tmqMsgHead { - int32_t headLen; int32_t protoVer; - int64_t cgId; - int64_t topicId; - int64_t clientId; - int32_t checksum; int32_t msgType; + int64_t cgId; + int64_t clientId; } tmqMsgHead; +typedef struct tmqOneAck { + int64_t topicId; + int64_t consumeOffset; +} tmqOneAck; + +typedef struct tmqAcks { + int32_t ackNum; + //should be sorted + tmqOneAck acks[]; +} tmqAcks; + //TODO: put msgs into common typedef struct tmqConnectReq { tmqMsgHead head; + tmqAcks acks; } tmqConnectReq; -typedef struct tmqConnectResp { +typedef struct tmqConnectRsp { tmqMsgHead head; int8_t status; -} tmqConnectResp; +} tmqConnectRsp; typedef struct tmqDisconnectReq { tmqMsgHead head; } tmqDisconnectReq; -typedef struct tmqDisconnectResp { +typedef struct tmqDisconnectRsp { tmqMsgHead head; int8_t status; -} tmqDiconnectResp; +} tmqDiconnectRsp; typedef struct tmqConsumeReq { tmqMsgHead head; - int64_t commitOffset; + tmqAcks acks; } tmqConsumeReq; -typedef struct tmqConsumeResp { - tmqMsgHead head; - char content[]; -} tmqConsumeResp; +typedef struct tmqMsgContent { + int64_t topicId; + int64_t msgLen; + char msg[]; +} tmqMsgContent; + +typedef struct tmqConsumeRsp { + tmqMsgHead head; + int64_t bodySize; + tmqMsgContent msgs[]; +} tmqConsumeRsp; -// typedef struct tmqMnodeSubscribeReq { tmqMsgHead head; int64_t topicLen; char topic[]; } tmqSubscribeReq; -typedef struct tmqMnodeSubscribeResp { +typedef struct tmqMnodeSubscribeRsp { tmqMsgHead head; int64_t vgId; char ep[]; //TSDB_EP_LEN -} tmqSubscribeResp; +} tmqSubscribeRsp; typedef struct tmqHeartbeatReq { } tmqHeartbeatReq; -typedef struct tmqHeartbeatResp { +typedef struct tmqHeartbeatRsp { -} tmqHeartbeatResp; +} tmqHeartbeatRsp; typedef struct tqTopicVhandle { //name @@ -92,33 +107,57 @@ typedef struct tqTopicVhandle { } tqTopicVhandle; typedef struct STQ { - //the set for topics - //key=topicName: str - //value=tqTopicVhandle + //the collection of group handle - //a map - //key= - //value=consumeOffset: int64_t } STQ; #define TQ_BUFFER_SIZE 8 +//TODO: define a serializer and deserializer typedef struct tqBufferItem { int64_t offset; + //executors are identical but not concurrent + //so it must be a copy in each item void* executor; + int64_t size; void* content; } tqBufferItem; -typedef struct tqGroupHandle { - char* topic; //c style, end with '\0' - int64_t cgId; - void* ahandle; - int64_t consumeOffset; +typedef struct tqBufferHandle { + //char* topic; //c style, end with '\0' + //int64_t cgId; + //void* ahandle; + int64_t nextConsumeOffset; + int64_t topicId; int32_t head; int32_t tail; tqBufferItem buffer[TQ_BUFFER_SIZE]; +} tqBufferHandle; + +typedef struct tqListHandle { + tqBufferHandle* bufHandle; + struct tqListHandle* next; +} tqListHandle; + +typedef struct tqGroupHandle { + int64_t cId; + int64_t cgId; + void* ahandle; + int32_t topicNum; + tqListHandle *head; } tqGroupHandle; +typedef struct tqQueryExec { + void* src; + tqBufferItem* dest; + void* executor; +} tqQueryExec; + +typedef struct tqQueryMsg { + tqQueryExec *exec; + struct tqQueryMsg *next; +} tqQueryMsg; + //init in each vnode STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void tqCleanUp(STQ*); @@ -127,20 +166,33 @@ void tqCleanUp(STQ*); int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -//void* will be replace by a msg type -int tqHandleConsumeMsg(STQ*, tmqConsumeReq* msg); +int tqConsume(STQ*, tmqConsumeReq*); -tqGroupHandle* tqFindGHandleBycId(STQ*, int64_t cId); +tqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqMoveOffsetToNext(tqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); -int tqFetchMsg(tqGroupHandle*, void*); int tqRegisterContext(tqGroupHandle*, void*); -int tqLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqLaunchQuery(tqGroupHandle*); int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset); +int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset); +int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset); +int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset); + +int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle); +int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle); +int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle); +int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem); + +int tqGetGHandleSSize(const tqGroupHandle *gHandle); +int tqListHandleSSize(const tqListHandle *listHandle); +int tqBufHandleSSize(const tqBufferHandle *bufHandle); +int tqBufItemSSize(const tqBufferItem *bufItem); + #ifdef __cplusplus } #endif diff --git a/source/server/vnode/tq/inc/tqInt.h b/source/server/vnode/tq/inc/tqInt.h index 0896e7afab..d19e9ec81e 100644 --- a/source/server/vnode/tq/inc/tqInt.h +++ b/source/server/vnode/tq/inc/tqInt.h @@ -26,13 +26,13 @@ extern "C" { //create persistent storage for meta info such as consuming offset //return value > 0: cgId //return value <= 0: error code -int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqGroupHandle** handle); +//int tqCreateTCGroup(STQ*, const char* topic, int cgId, tqBufferHandle** handle); //create ring buffer in memory and load consuming offset //int tqOpenTCGroup(STQ*, const char* topic, int cgId); //destroy ring buffer and persist consuming offset //int tqCloseTCGroup(STQ*, const char* topic, int cgId); //delete persistent storage for meta info -int tqDropTCGroup(STQ*, const char* topic, int cgId); +//int tqDropTCGroup(STQ*, const char* topic, int cgId); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index e0f2fc545e..b0a1c08673 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -21,64 +21,136 @@ //send to fetch queue // //handle management message +// +static int tqProtoCheck(tmqMsgHead *pMsg) { + return pMsg->protoVer == 0; +} -tqGroupHandle* tqLookupGroupHandle(STQ *pTq, const char* topic, int cgId) { - //look in memory - // - //not found, try to restore from disk - // - //still not found - return NULL; +static int tqAckOneTopic(tqBufferHandle *bhandle, tmqOneAck *pAck, tqQueryMsg** ppQuery) { + //clean old item and move forward + int32_t consumeOffset = pAck->consumeOffset; + int idx = consumeOffset % TQ_BUFFER_SIZE; + ASSERT(bhandle->buffer[idx].content && bhandle->buffer[idx].executor); + tfree(bhandle->buffer[idx].content); + if( 1 /* TODO: need to launch new query */) { + tqQueryMsg* pNewQuery = malloc(sizeof(tqQueryMsg)); + if(pNewQuery == NULL) { + //TODO: memory insufficient + return -1; + } + //TODO: lock executor + pNewQuery->exec->executor = bhandle->buffer[idx].executor; + //TODO: read from wal and assign to src + pNewQuery->exec->src = 0; + pNewQuery->exec->dest = &bhandle->buffer[idx]; + pNewQuery->next = *ppQuery; + *ppQuery = pNewQuery; + } + return 0; +} + +static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { + int32_t ackNum = pAcks->ackNum; + tmqOneAck *acks = pAcks->acks; + //double ptr for acks and list + int i = 0; + tqListHandle* node = ghandle->head; + int ackCnt = 0; + tqQueryMsg *pQuery = NULL; + while(i < ackNum && node->next) { + if(acks[i].topicId == node->next->bufHandle->topicId) { + ackCnt++; + tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery); + } else if(acks[i].topicId < node->next->bufHandle->topicId) { + i++; + } else { + node = node->next; + } + } + if(pQuery) { + //post message + } + return ackCnt; } static int tqCommitTCGroup(tqGroupHandle* handle) { - //persist into disk + //persist modification into disk return 0; } -int tqCreateTCGroup(STQ *pTq, const char* topic, int cgId, tqGroupHandle** handle) { +int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, tqGroupHandle** handle) { + //create in disk return 0; } -int tqOpenTGroup(STQ* pTq, const char* topic, int cgId) { - int code; - tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId); - if(handle == NULL) { - code = tqCreateTCGroup(pTq, topic, cgId, &handle); - if(code != 0) { - return code; - } - } - ASSERT(handle != NULL); - - //put into STQ - +int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + //look up in disk + //create + //open return 0; } -/*int tqCloseTCGroup(STQ* pTq, const char* topic, int cgId) {*/ - /*tqGroupHandle* handle = tqLookupGroupHandle(pTq, topic, cgId);*/ - /*return tqCommitTCGroup(handle);*/ -/*}*/ +int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + return 0; +} -int tqDropTCGroup(STQ* pTq, const char* topic, int cgId) { +int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { //delete from disk return 0; } +static int tqFetch(tqGroupHandle* ghandle, void** msg) { + tqListHandle* head = ghandle->head; + tqListHandle* node = head; + int totSize = 0; + //TODO: make it a macro + int sizeLimit = 4 * 1024; + tmqMsgContent* buffer = malloc(sizeLimit); + if(buffer == NULL) { + //TODO:memory insufficient + return -1; + } + //iterate the list to get msgs of all topics + //until all topic iterated or msgs over sizeLimit + while(node->next) { + node = node->next; + tqBufferHandle* bufHandle = node->bufHandle; + int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; + if(bufHandle->buffer[idx].content != NULL && + bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset + ) { + totSize += bufHandle->buffer[idx].size; + if(totSize > sizeLimit) { + void *ptr = realloc(buffer, totSize); + if(ptr == NULL) { + totSize -= bufHandle->buffer[idx].size; + //TODO:memory insufficient + //return msgs already copied + break; + } + } + *((int64_t*)buffer) = bufHandle->topicId; + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + *((int64_t*)buffer) = bufHandle->buffer[idx].size; + buffer = POINTER_SHIFT(buffer, sizeof(int64_t)); + memcpy(buffer, bufHandle->buffer[idx].content, bufHandle->buffer[idx].size); + buffer = POINTER_SHIFT(buffer, bufHandle->buffer[idx].size); + if(totSize > sizeLimit) { + break; + } + } + } + if(totSize == 0) { + //no msg + return -1; + } -int tqFetchMsg(tqGroupHandle* handle, void* msg) { - return 0; + return totSize; } -int tqMoveOffsetToNext(tqGroupHandle* handle) { - return 0; -} - - -tqGroupHandle* tqFindGHandleBycId(STQ* pTq, int64_t cId) { - return NULL; -} +/*int tqMoveOffsetToNext(tqGroupHandle* ghandle) {*/ + /*return 0;*/ +/*}*/ int tqPushMsg(STQ* pTq , void* p, int64_t version) { //add reference @@ -91,10 +163,108 @@ int tqCommit(STQ* pTq) { return 0; } -int tqHandleConsumeMsg(STQ* pTq, tmqConsumeReq* msg) { - //parse msg and extract topic and cgId - //lookup handle - //confirm message and send to consumer +int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { + if(!tqProtoCheck((tmqMsgHead *)pMsg)) { + //proto version invalid + return -1; + } + int64_t clientId = pMsg->head.clientId; + tqGroupHandle *ghandle = tqGetGroupHandle(pTq, clientId); + if(ghandle == NULL) { + //client not connect + return -1; + } + if(pMsg->acks.ackNum != 0) { + if(tqAck(ghandle, &pMsg->acks) != 0) { + //ack not success + return -1; + } + } + + tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; + + if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) { + //fetch error + return -1; + } + //judge and launch new query + if(tqLaunchQuery(ghandle)) { + //launch query error + return -1; + } return 0; } + + +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) { + //calculate size + int sz = tqGetGHandleSSize(gHandle); + if(sz <= 0) { + //TODO: err + return -1; + } + void* ptr = realloc(*ppBytes, sz); + if(ptr == NULL) { + free(ppBytes); + //TODO: memory err + return -1; + } + *ppBytes = ptr; + //do serialize + *(int64_t*)ptr = gHandle->cId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int64_t*)ptr = gHandle->cgId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int32_t*)ptr = gHandle->topicNum; + if(gHandle->topicNum > 0) { + tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes); + } + return 0; +} + +int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) { + void* ptr = POINTER_SHIFT(*ppBytes, offset); + tqListHandle *node = listHandle; + while(node->next) { + node = node->next; + offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset); + } + return offset; +} +int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) { + void *ptr = POINTER_SHIFT(*ppBytes, offset); + *(int64_t*)ptr = bufHandle->nextConsumeOffset; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int64_t*)ptr = bufHandle->topicId; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + *(int32_t*)ptr = bufHandle->head; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + *(int32_t*)ptr = bufHandle->tail; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + for(int i = 0; i < TQ_BUFFER_SIZE; i++) { + int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes); + ptr = POINTER_SHIFT(ptr, sz); + } + return ptr - *ppBytes; +} + +int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) { + void *ptr = POINTER_SHIFT(*ppBytes, offset); + //TODO: do we need serialize this? + return 0; +} + +int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) { + return 0; +} +int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) { + return 0; +} +int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) { + return 0; +} +int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { + return 0; +} +