diff --git a/include/server/vnode/tq/tq.h b/include/server/vnode/tq/tq.h index c90991ae10..1e219e0983 100644 --- a/include/server/vnode/tq/tq.h +++ b/include/server/vnode/tq/tq.h @@ -135,7 +135,7 @@ typedef struct tqBufferHandle { } tqBufferHandle; typedef struct tqListHandle { - tqBufferHandle* bufHandle; + tqBufferHandle bufHandle; struct tqListHandle* next; } tqListHandle; @@ -176,22 +176,20 @@ int tqMoveOffsetToNext(tqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); int tqRegisterContext(tqGroupHandle*, void*); int tqLaunchQuery(tqGroupHandle*); -int tqSendLaunchQuery(STQ*, int64_t topicId, int64_t cgId, void* query); +int tqSendLaunchQuery(tqGroupHandle*); -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset); -int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset); -int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset); -int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset); +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes); +void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr); +void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr); +void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr); -int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle); -int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle); -int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle); -int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem); +const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *ghandle); +const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem); int tqGetGHandleSSize(const tqGroupHandle *gHandle); -int tqListHandleSSize(const tqListHandle *listHandle); -int tqBufHandleSSize(const tqBufferHandle *bufHandle); -int tqBufItemSSize(const tqBufferItem *bufItem); +int tqBufHandleSSize(); +int tqBufItemSSize(); #ifdef __cplusplus } diff --git a/source/server/vnode/tq/src/tq.c b/source/server/vnode/tq/src/tq.c index 52702057d6..fc15a676d8 100644 --- a/source/server/vnode/tq/src/tq.c +++ b/source/server/vnode/tq/src/tq.c @@ -58,10 +58,10 @@ static int tqAck(tqGroupHandle* ghandle, tmqAcks* pAcks) { int ackCnt = 0; tqQueryMsg *pQuery = NULL; while(i < ackNum && node->next) { - if(acks[i].topicId == node->next->bufHandle->topicId) { + if(acks[i].topicId == node->next->bufHandle.topicId) { ackCnt++; - tqAckOneTopic(node->next->bufHandle, &acks[i], &pQuery); - } else if(acks[i].topicId < node->next->bufHandle->topicId) { + tqAckOneTopic(&node->next->bufHandle, &acks[i], &pQuery); + } else if(acks[i].topicId < node->next->bufHandle.topicId) { i++; } else { node = node->next; @@ -114,7 +114,7 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { //until all topic iterated or msgs over sizeLimit while(node->next) { node = node->next; - tqBufferHandle* bufHandle = node->bufHandle; + tqBufferHandle* bufHandle = &node->bufHandle; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; if(bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset @@ -140,11 +140,6 @@ static int tqFetch(tqGroupHandle* ghandle, void** msg) { } } } - if(totSize == 0) { - //no msg - return -1; - } - return totSize; } @@ -157,7 +152,7 @@ int tqLaunchQuery(tqGroupHandle* ghandle) { return 0; } -int tqSendLaunchQuery(STQ* pTq, int64_t topicId, int64_t cgId, void* query) { +int tqSendLaunchQuery(tqGroupHandle* gHandle) { return 0; } @@ -196,7 +191,7 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { tmqConsumeRsp *pRsp = (tmqConsumeRsp*) pMsg; - if(tqFetch(ghandle, (void**)&pRsp->msgs) < 0) { + if(tqFetch(ghandle, (void**)&pRsp->msgs) <= 0) { //fetch error return -1; } @@ -209,14 +204,9 @@ int tqConsume(STQ* pTq, tmqConsumeReq* pMsg) { return 0; } - -int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offset) { +int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes) { //calculate size int sz = tqGetGHandleSSize(gHandle); - if(sz <= 0) { - //TODO: err - return -1; - } void* ptr = realloc(*ppBytes, sz); if(ptr == NULL) { free(ppBytes); @@ -224,29 +214,30 @@ int tqSerializeGroupHandle(tqGroupHandle *gHandle, void** ppBytes, int32_t offse return -1; } *ppBytes = ptr; - //do serialize + //do serialization *(int64_t*)ptr = gHandle->cId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = gHandle->cgId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int32_t*)ptr = gHandle->topicNum; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); if(gHandle->topicNum > 0) { - tqSerializeListHandle(gHandle->head, ppBytes, ptr - *ppBytes); + tqSerializeListHandle(gHandle->head, ptr); } return 0; } -int tqSerializeListHandle(tqListHandle *listHandle, void** ppBytes, int32_t offset) { - void* ptr = POINTER_SHIFT(*ppBytes, offset); +void* tqSerializeListHandle(tqListHandle *listHandle, void* ptr) { tqListHandle *node = listHandle; - while(node->next) { + ASSERT(node != NULL); + while(node) { + ptr = tqSerializeBufHandle(&node->bufHandle, ptr); node = node->next; - offset = tqSerializeBufHandle(node->bufHandle, ppBytes, offset); } - return offset; + return ptr; } -int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offset) { - void *ptr = POINTER_SHIFT(*ppBytes, offset); + +void* tqSerializeBufHandle(tqBufferHandle *bufHandle, void* ptr) { *(int64_t*)ptr = bufHandle->nextConsumeOffset; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); *(int64_t*)ptr = bufHandle->topicId; @@ -256,41 +247,87 @@ int tqSerializeBufHandle(tqBufferHandle *bufHandle, void** ppBytes, int32_t offs *(int32_t*)ptr = bufHandle->tail; ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); for(int i = 0; i < TQ_BUFFER_SIZE; i++) { - int sz = tqSerializeBufItem(&bufHandle->buffer[i], ppBytes, ptr - *ppBytes); - ptr = POINTER_SHIFT(ptr, sz); + ptr = tqSerializeBufItem(&bufHandle->buffer[i], ptr); } - return ptr - *ppBytes; + return ptr; } -int tqSerializeBufItem(tqBufferItem *bufItem, void** ppBytes, int32_t offset) { - void *ptr = POINTER_SHIFT(*ppBytes, offset); +void* tqSerializeBufItem(tqBufferItem *bufItem, void* ptr) { //TODO: do we need serialize this? - return 0; + //mainly for executor + return ptr; } -int tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle **pGhandle) { - return 0; -} -int tqDeserializeListHandle(const void* pBytes, tqListHandle **pListHandle) { - return 0; -} -int tqDeserializeBufHandle(const void* pBytes, tqBufferHandle **pBufHandle) { - return 0; -} -int tqDeserializeBufItem(const void* pBytes, tqBufferItem **pBufItem) { - return 0; +const void* tqDeserializeGroupHandle(const void* pBytes, tqGroupHandle *gHandle) { + const void* ptr = pBytes; + gHandle->cId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + gHandle->cgId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + gHandle->ahandle = NULL; + gHandle->topicNum = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + gHandle->head = NULL; + tqListHandle *node = gHandle->head; + for(int i = 0; i < gHandle->topicNum; i++) { + if(gHandle->head == NULL) { + if((node = malloc(sizeof(tqListHandle))) == NULL) { + //TODO: error + return NULL; + } + node->next= NULL; + ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); + gHandle->head = node; + } else { + node->next = malloc(sizeof(tqListHandle)); + if(node->next == NULL) { + //TODO: error + return NULL; + } + node->next->next = NULL; + ptr = tqDeserializeBufHandle(ptr, &node->next->bufHandle); + node = node->next; + } + } + return ptr; } +const void* tqDeserializeBufHandle(const void* pBytes, tqBufferHandle *bufHandle) { + const void* ptr = pBytes; + bufHandle->nextConsumeOffset = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + bufHandle->topicId = *(int64_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); + bufHandle->head = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + bufHandle->tail = *(int32_t*)ptr; + ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); + for(int i = 0; i < TQ_BUFFER_SIZE; i++) { + ptr = tqDeserializeBufItem(ptr, &bufHandle->buffer[i]); + } + return ptr; +} +const void* tqDeserializeBufItem(const void* pBytes, tqBufferItem *bufItem) { + return pBytes; +} + +//TODO: make this a macro int tqGetGHandleSSize(const tqGroupHandle *gHandle) { - return 0; -} -int tqListHandleSSize(const tqListHandle *listHandle) { - return 0; -} -int tqBufHandleSSize(const tqBufferHandle *bufHandle) { - return 0; -} -int tqBufItemSSize(const tqBufferItem *bufItem) { + return sizeof(int64_t) * 2 + + sizeof(int32_t) + + gHandle->topicNum * tqBufHandleSSize(); +} + +//TODO: make this a macro +int tqBufHandleSSize() { + return sizeof(int64_t) * 2 + + sizeof(int32_t) * 2 + + TQ_BUFFER_SIZE * tqBufItemSSize(); +} + +int tqBufItemSSize() { + //TODO: do this need serialization? + //mainly for executor return 0; }