commit
6180823d52
|
@ -22,6 +22,8 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -54,6 +56,7 @@ typedef struct STqSetCurReq {
|
||||||
|
|
||||||
typedef struct STqConsumeReq {
|
typedef struct STqConsumeReq {
|
||||||
STqMsgHead head;
|
STqMsgHead head;
|
||||||
|
int64_t blockingTime; // milisec
|
||||||
STqAcks acks;
|
STqAcks acks;
|
||||||
} STqConsumeReq;
|
} STqConsumeReq;
|
||||||
|
|
||||||
|
@ -101,12 +104,21 @@ typedef struct STqTopicVhandle {
|
||||||
typedef struct STqExec {
|
typedef struct STqExec {
|
||||||
void* runtimeEnv;
|
void* runtimeEnv;
|
||||||
SSDataBlock* (*exec)(void* runtimeEnv);
|
SSDataBlock* (*exec)(void* runtimeEnv);
|
||||||
void* (*assign)(void* runtimeEnv, SSubmitBlk* inputData);
|
void* (*assign)(void* runtimeEnv, void* inputData);
|
||||||
void (*clear)(void* runtimeEnv);
|
void (*clear)(void* runtimeEnv);
|
||||||
char* (*serialize)(struct STqExec*);
|
char* (*serialize)(struct STqExec*);
|
||||||
struct STqExec* (*deserialize)(char*);
|
struct STqExec* (*deserialize)(char*);
|
||||||
} STqExec;
|
} STqExec;
|
||||||
|
|
||||||
|
typedef struct STqRspHandle {
|
||||||
|
void* handle;
|
||||||
|
void* ahandle;
|
||||||
|
} STqRspHandle;
|
||||||
|
|
||||||
|
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||||
|
|
||||||
|
typedef struct STqTopic STqTopic;
|
||||||
|
|
||||||
typedef struct STqBufferItem {
|
typedef struct STqBufferItem {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
// executors are identical but not concurrent
|
// executors are identical but not concurrent
|
||||||
|
@ -115,19 +127,21 @@ typedef struct STqBufferItem {
|
||||||
int32_t status;
|
int32_t status;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
void* content;
|
void* content;
|
||||||
|
STqTopic* pTopic;
|
||||||
} STqMsgItem;
|
} STqMsgItem;
|
||||||
|
|
||||||
typedef struct STqTopic {
|
struct STqTopic {
|
||||||
// char* topic; //c style, end with '\0'
|
// char* topic; //c style, end with '\0'
|
||||||
// int64_t cgId;
|
// int64_t cgId;
|
||||||
// void* ahandle;
|
// void* ahandle;
|
||||||
|
// int32_t head;
|
||||||
|
// int32_t tail;
|
||||||
int64_t nextConsumeOffset;
|
int64_t nextConsumeOffset;
|
||||||
int64_t floatingCursor;
|
int64_t floatingCursor;
|
||||||
int64_t topicId;
|
int64_t topicId;
|
||||||
int32_t head;
|
void* logReader;
|
||||||
int32_t tail;
|
|
||||||
STqMsgItem buffer[TQ_BUFFER_SIZE];
|
STqMsgItem buffer[TQ_BUFFER_SIZE];
|
||||||
} STqTopic;
|
};
|
||||||
|
|
||||||
typedef struct STqListHandle {
|
typedef struct STqListHandle {
|
||||||
STqTopic topic;
|
STqTopic topic;
|
||||||
|
@ -141,7 +155,7 @@ typedef struct STqGroup {
|
||||||
int32_t topicNum;
|
int32_t topicNum;
|
||||||
STqList* head;
|
STqList* head;
|
||||||
SList* topicList; // SList<STqTopic>
|
SList* topicList; // SList<STqTopic>
|
||||||
void* returnMsg; // SVReadMsg
|
STqRspHandle rspHandle;
|
||||||
} STqGroup;
|
} STqGroup;
|
||||||
|
|
||||||
typedef struct STqQueryMsg {
|
typedef struct STqQueryMsg {
|
||||||
|
@ -149,20 +163,23 @@ typedef struct STqQueryMsg {
|
||||||
struct STqQueryMsg* next;
|
struct STqQueryMsg* next;
|
||||||
} STqQueryMsg;
|
} STqQueryMsg;
|
||||||
|
|
||||||
typedef struct STqLogReader {
|
typedef struct STqLogHandle {
|
||||||
void* logHandle;
|
void* logHandle;
|
||||||
int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
|
void* (*openLogReader)(void* logHandle);
|
||||||
|
void (*closeLogReader)(void* logReader);
|
||||||
|
int32_t (*logRead)(void* logReader, void** data, int64_t ver);
|
||||||
|
|
||||||
int64_t (*logGetFirstVer)(void* logHandle);
|
int64_t (*logGetFirstVer)(void* logHandle);
|
||||||
int64_t (*logGetSnapshotVer)(void* logHandle);
|
int64_t (*logGetSnapshotVer)(void* logHandle);
|
||||||
int64_t (*logGetLastVer)(void* logHandle);
|
int64_t (*logGetLastVer)(void* logHandle);
|
||||||
} STqLogReader;
|
} STqLogHandle;
|
||||||
|
|
||||||
typedef struct STqCfg {
|
typedef struct STqCfg {
|
||||||
// TODO
|
// TODO
|
||||||
} STqCfg;
|
} STqCfg;
|
||||||
|
|
||||||
typedef struct STqMemRef {
|
typedef struct STqMemRef {
|
||||||
SMemAllocatorFactory* pAlloctorFactory;
|
SMemAllocatorFactory* pAllocatorFactory;
|
||||||
SMemAllocator* pAllocator;
|
SMemAllocator* pAllocator;
|
||||||
} STqMemRef;
|
} STqMemRef;
|
||||||
|
|
||||||
|
@ -252,19 +269,30 @@ typedef struct STQ {
|
||||||
// the handle of meta kvstore
|
// the handle of meta kvstore
|
||||||
char* path;
|
char* path;
|
||||||
STqCfg* tqConfig;
|
STqCfg* tqConfig;
|
||||||
STqLogReader* tqLogReader;
|
STqLogHandle* tqLogHandle;
|
||||||
STqMemRef tqMemRef;
|
STqMemRef tqMemRef;
|
||||||
STqMetaStore* tqMeta;
|
STqMetaStore* tqMeta;
|
||||||
} STQ;
|
} STQ;
|
||||||
|
|
||||||
|
typedef struct STqMgmt {
|
||||||
|
int8_t inited;
|
||||||
|
tmr_h timer;
|
||||||
|
} STqMgmt;
|
||||||
|
|
||||||
|
static STqMgmt tqMgmt;
|
||||||
|
|
||||||
|
// init once
|
||||||
|
int tqInit();
|
||||||
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
|
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac);
|
||||||
void tqClose(STQ*);
|
void tqClose(STQ*);
|
||||||
|
|
||||||
// void* will be replace by a msg type
|
// void* will be replace by a msg type
|
||||||
int tqPushMsg(STQ*, void* msg, int64_t version);
|
int tqPushMsg(STQ*, void* msg, int64_t version);
|
||||||
int tqCommit(STQ*);
|
int tqCommit(STQ*);
|
||||||
int tqConsume(STQ*, STqConsumeReq*);
|
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
|
||||||
|
|
||||||
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
|
int tqSetCursor(STQ*, STqSetCurReq* pMsg);
|
||||||
int tqBufferSetOffset(STqTopic*, int64_t offset);
|
int tqBufferSetOffset(STqTopic*, int64_t offset);
|
||||||
|
|
|
@ -122,6 +122,16 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
|
||||||
*/
|
*/
|
||||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Process a consume message.
|
||||||
|
*
|
||||||
|
* @param pVnode The vnode object.
|
||||||
|
* @param pMsg The request message
|
||||||
|
* @param pRsp The response message
|
||||||
|
* @return int 0 for success, -1 for failure
|
||||||
|
*/
|
||||||
|
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Process the sync request
|
* @brief Process the sync request
|
||||||
*
|
*
|
||||||
|
|
|
@ -174,8 +174,11 @@ SWalReadHandle *walOpenReadHandle(SWal *);
|
||||||
void walCloseReadHandle(SWalReadHandle *);
|
void walCloseReadHandle(SWalReadHandle *);
|
||||||
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
|
int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver);
|
||||||
|
|
||||||
|
// deprecated
|
||||||
|
#if 0
|
||||||
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
||||||
// int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
||||||
|
#endif
|
||||||
|
|
||||||
// lifecycle check
|
// lifecycle check
|
||||||
int64_t walGetFirstVer(SWal *);
|
int64_t walGetFirstVer(SWal *);
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_UTIL_TIMER_H
|
#ifndef _TD_UTIL_TIMER_H
|
||||||
#define _TD_UTIL_TIMER_H
|
#define _TD_UTIL_TIMER_H
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -12,6 +12,7 @@ target_link_libraries(
|
||||||
PUBLIC os
|
PUBLIC os
|
||||||
PUBLIC util
|
PUBLIC util
|
||||||
PUBLIC common
|
PUBLIC common
|
||||||
|
PUBLIC transport
|
||||||
)
|
)
|
||||||
|
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
#include "trpc.h"
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -35,7 +35,22 @@ void* tqSerializeItem(STqMsgItem* pItem, void* ptr);
|
||||||
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
|
const void* tqDeserializeTopic(const void* pBytes, STqTopic* pTopic);
|
||||||
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
|
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem);
|
||||||
|
|
||||||
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
|
int tqInit() {
|
||||||
|
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1);
|
||||||
|
if(old == 1) return 0;
|
||||||
|
|
||||||
|
tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tqCleanUp() {
|
||||||
|
int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0);
|
||||||
|
if(old == 0) return;
|
||||||
|
taosTmrStop(tqMgmt.timer);
|
||||||
|
taosTmrCleanUp(tqMgmt.timer);
|
||||||
|
}
|
||||||
|
|
||||||
|
STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) {
|
||||||
STQ* pTq = malloc(sizeof(STQ));
|
STQ* pTq = malloc(sizeof(STQ));
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
@ -43,9 +58,9 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->tqConfig = tqConfig;
|
pTq->tqConfig = tqConfig;
|
||||||
pTq->tqLogReader = tqLogReader;
|
pTq->tqLogHandle = tqLogHandle;
|
||||||
#if 0
|
#if 0
|
||||||
pTq->tqMemRef.pAlloctorFactory = allocFac;
|
pTq->tqMemRef.pAllocatorFactory = allocFac;
|
||||||
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
||||||
if (pTq->tqMemRef.pAllocator == NULL) {
|
if (pTq->tqMemRef.pAllocator == NULL) {
|
||||||
// TODO: error code of buffer pool
|
// TODO: error code of buffer pool
|
||||||
|
@ -53,16 +68,24 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
|
||||||
#endif
|
#endif
|
||||||
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
|
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
|
||||||
if (pTq->tqMeta == NULL) {
|
if (pTq->tqMeta == NULL) {
|
||||||
// TODO: free STQ
|
free(pTq);
|
||||||
|
#if 0
|
||||||
|
allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator);
|
||||||
|
#endif
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return pTq;
|
return pTq;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqClose(STQ* pTq) {
|
void tqClose(STQ* pTq) {
|
||||||
// TODO
|
// TODO
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tqProtoCheck(STqMsgHead* pMsg) { return pMsg->protoVer == 0; }
|
static int tqProtoCheck(STqMsgHead* pMsg) {
|
||||||
|
// TODO
|
||||||
|
return pMsg->protoVer == 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
|
static int tqAckOneTopic(STqTopic* pTopic, STqOneAck* pAck, STqQueryMsg** ppQuery) {
|
||||||
// clean old item and move forward
|
// clean old item and move forward
|
||||||
|
@ -126,6 +149,13 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup
|
||||||
*ppGroup = pGroup;
|
*ppGroup = pGroup;
|
||||||
memset(pGroup, 0, sizeof(STqGroup));
|
memset(pGroup, 0, sizeof(STqGroup));
|
||||||
|
|
||||||
|
pGroup->topicList = tdListNew(sizeof(STqTopic));
|
||||||
|
if(pGroup->topicList == NULL) {
|
||||||
|
free(pGroup);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*ppGroup = pGroup;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,46 +184,55 @@ int tqDropGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tqFetch(STqGroup* pGroup, void** msg) {
|
static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) {
|
||||||
STqList* head = pGroup->head;
|
STqList* pHead = pGroup->head;
|
||||||
STqList* node = head;
|
STqList* pNode = pHead;
|
||||||
int totSize = 0;
|
int totSize = 0;
|
||||||
|
int numOfMsgs = 0;
|
||||||
// TODO: make it a macro
|
// TODO: make it a macro
|
||||||
int sizeLimit = 4 * 1024;
|
int sizeLimit = 4 * 1024;
|
||||||
STqMsgContent* buffer = malloc(sizeLimit);
|
|
||||||
if (buffer == NULL) {
|
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit);
|
||||||
// TODO:memory insufficient
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
*pRsp = ptr;
|
||||||
|
STqMsgContent* buffer = (*pRsp)->msgs;
|
||||||
|
|
||||||
// iterate the list to get msgs of all topics
|
// iterate the list to get msgs of all topics
|
||||||
// until all topic iterated or msgs over sizeLimit
|
// until all topic iterated or msgs over sizeLimit
|
||||||
while (node->next) {
|
while (pNode->next) {
|
||||||
node = node->next;
|
pNode = pNode->next;
|
||||||
STqTopic* topicHandle = &node->topic;
|
STqTopic* pTopic = &pNode->topic;
|
||||||
int idx = topicHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
|
int idx = pTopic->nextConsumeOffset % TQ_BUFFER_SIZE;
|
||||||
if (topicHandle->buffer[idx].content != NULL && topicHandle->buffer[idx].offset == topicHandle->nextConsumeOffset) {
|
if (pTopic->buffer[idx].content != NULL && pTopic->buffer[idx].offset == pTopic->nextConsumeOffset) {
|
||||||
totSize += topicHandle->buffer[idx].size;
|
totSize += pTopic->buffer[idx].size;
|
||||||
if (totSize > sizeLimit) {
|
if (totSize > sizeLimit) {
|
||||||
void* ptr = realloc(buffer, totSize);
|
void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + totSize);
|
||||||
if (ptr == NULL) {
|
if (ptr == NULL) {
|
||||||
totSize -= topicHandle->buffer[idx].size;
|
totSize -= pTopic->buffer[idx].size;
|
||||||
// TODO:memory insufficient
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
// return msgs already copied
|
// return msgs already copied
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
*pRsp = ptr;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
*((int64_t*)buffer) = topicHandle->topicId;
|
*((int64_t*)buffer) = pTopic->topicId;
|
||||||
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||||
*((int64_t*)buffer) = topicHandle->buffer[idx].size;
|
*((int64_t*)buffer) = pTopic->buffer[idx].size;
|
||||||
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||||
memcpy(buffer, topicHandle->buffer[idx].content, topicHandle->buffer[idx].size);
|
memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
|
||||||
buffer = POINTER_SHIFT(buffer, topicHandle->buffer[idx].size);
|
buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
|
||||||
|
numOfMsgs++;
|
||||||
if (totSize > sizeLimit) {
|
if (totSize > sizeLimit) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totSize;
|
(*pRsp)->bodySize = totSize;
|
||||||
|
return numOfMsgs;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
|
STqGroup* tqGetGroup(STQ* pTq, int64_t clientId) { return tqHandleGet(pTq->tqMeta, clientId); }
|
||||||
|
@ -275,7 +314,22 @@ int tqSetCursor(STQ* pTq, STqSetCurReq* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
|
// temporary
|
||||||
|
int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) {
|
||||||
|
int64_t clientId = pMsg->head.clientId;
|
||||||
|
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
||||||
|
if (pGroup == NULL) {
|
||||||
|
terrno = TSDB_CODE_TQ_GROUP_NOT_SET;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
pGroup->rspHandle.handle = pRsp->handle;
|
||||||
|
pGroup->rspHandle.ahandle = pRsp->ahandle;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) {
|
||||||
|
STqConsumeReq *pMsg = pReq->pCont;
|
||||||
int64_t clientId = pMsg->head.clientId;
|
int64_t clientId = pMsg->head.clientId;
|
||||||
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
STqGroup* pGroup = tqGetGroup(pTq, clientId);
|
||||||
if (pGroup == NULL) {
|
if (pGroup == NULL) {
|
||||||
|
@ -283,20 +337,107 @@ int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
|
SList* topicList = pGroup->topicList;
|
||||||
int numOfMsgs = tqFetch(pGroup, (void**)&pRsp->msgs);
|
|
||||||
if (numOfMsgs < 0) {
|
int totSize = 0;
|
||||||
|
int numOfMsgs = 0;
|
||||||
|
int sizeLimit = 4096;
|
||||||
|
|
||||||
|
|
||||||
|
STqConsumeRsp *pCsmRsp = (*pRsp)->pCont;
|
||||||
|
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (numOfMsgs == 0) {
|
(*pRsp)->pCont = ptr;
|
||||||
|
|
||||||
|
SListIter iter;
|
||||||
|
tdListInitIter(topicList, &iter, TD_LIST_FORWARD);
|
||||||
|
|
||||||
|
STqMsgContent* buffer = NULL;
|
||||||
|
SArray* pArray = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
||||||
|
SListNode *pn;
|
||||||
|
while((pn = tdListNext(&iter)) != NULL) {
|
||||||
|
STqTopic* pTopic = *(STqTopic**)pn->data;
|
||||||
|
int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE;
|
||||||
|
STqMsgItem* pItem = &pTopic->buffer[idx];
|
||||||
|
if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) {
|
||||||
|
if(pItem->status == TQ_ITEM_READY) {
|
||||||
|
//if has data
|
||||||
|
totSize += pTopic->buffer[idx].size;
|
||||||
|
if (totSize > sizeLimit) {
|
||||||
|
void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
totSize -= pTopic->buffer[idx].size;
|
||||||
|
terrno = TSDB_CODE_TQ_OUT_OF_MEMORY;
|
||||||
|
// return msgs already copied
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
(*pRsp)->pCont = ptr;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
*((int64_t*)buffer) = htobe64(pTopic->topicId);
|
||||||
|
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||||
|
*((int64_t*)buffer) = htobe64(pTopic->buffer[idx].size);
|
||||||
|
buffer = POINTER_SHIFT(buffer, sizeof(int64_t));
|
||||||
|
memcpy(buffer, pTopic->buffer[idx].content, pTopic->buffer[idx].size);
|
||||||
|
buffer = POINTER_SHIFT(buffer, pTopic->buffer[idx].size);
|
||||||
|
numOfMsgs++;
|
||||||
|
if (totSize > sizeLimit) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} else if(pItem->status == TQ_ITEM_PROCESS) {
|
||||||
|
//if not have data but in process
|
||||||
|
|
||||||
|
} else if(pItem->status == TQ_ITEM_EMPTY){
|
||||||
|
//if not have data and not in process
|
||||||
|
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS);
|
||||||
|
if(old != TQ_ITEM_EMPTY) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pItem->offset = pTopic->floatingCursor;
|
||||||
|
taosArrayPush(pArray, &pItem);
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfMsgs > 0) {
|
||||||
|
// set code and other msg
|
||||||
|
rpcSendResponse(*pRsp);
|
||||||
|
} else {
|
||||||
// most recent data has been fetched
|
// most recent data has been fetched
|
||||||
|
|
||||||
// enable timer for blocking wait
|
// enable timer for blocking wait
|
||||||
// once new data written during wait time
|
// once new data written when waiting, launch query and rsp
|
||||||
// launch query and response
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fetched a num of msgs, rpc response
|
// fetched a num of msgs, rpc response
|
||||||
|
for(int i = 0; i < pArray->size; i++) {
|
||||||
|
STqMsgItem* pItem = taosArrayGet(pArray, i);
|
||||||
|
|
||||||
|
//read from wal
|
||||||
|
void* raw = NULL;
|
||||||
|
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
|
||||||
|
int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);
|
||||||
|
if(code < 0) {
|
||||||
|
//TODO: error
|
||||||
|
}
|
||||||
|
//get msgType
|
||||||
|
//if submitblk
|
||||||
|
pItem->executor->assign(pItem->executor->runtimeEnv, raw);
|
||||||
|
SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv);
|
||||||
|
pItem->content = content;
|
||||||
|
//if other type, send just put into buffer
|
||||||
|
/*pItem->content = raw;*/
|
||||||
|
|
||||||
|
int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY);
|
||||||
|
ASSERT(old == TQ_ITEM_PROCESS);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -378,10 +519,10 @@ void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
*(int64_t*)ptr = pTopic->topicId;
|
*(int64_t*)ptr = pTopic->topicId;
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
*(int32_t*)ptr = pTopic->head;
|
/**(int32_t*)ptr = pTopic->head;*/
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||||
*(int32_t*)ptr = pTopic->tail;
|
/**(int32_t*)ptr = pTopic->tail;*/
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
|
ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
|
||||||
}
|
}
|
||||||
|
@ -435,10 +576,10 @@ const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
topic->topicId = *(int64_t*)ptr;
|
topic->topicId = *(int64_t*)ptr;
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
topic->head = *(int32_t*)ptr;
|
/*topic->head = *(int32_t*)ptr;*/
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||||
topic->tail = *(int32_t*)ptr;
|
/*topic->tail = *(int32_t*)ptr;*/
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
|
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
|
ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,6 +170,7 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||||
int code;
|
int code;
|
||||||
code = walSeekVer(pWal, ver);
|
code = walSeekVer(pWal, ver);
|
||||||
|
@ -207,6 +208,7 @@ int32_t walRead(SWal *pWal, SWalHead **ppHead, int64_t ver) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {*/
|
int32_t walReadWithFp(SWal *pWal, FWalWrite writeFp, int64_t verStart, int32_t readNum) {
|
||||||
/*return 0;*/
|
return 0;
|
||||||
/*}*/
|
}
|
||||||
|
#endif
|
||||||
|
|
Loading…
Reference in New Issue