refactor tq
This commit is contained in:
parent
4ad2604107
commit
29bc056abf
|
@ -27,7 +27,7 @@ if(${BUILD_TEST})
|
|||
endif(${BUILD_TEST})
|
||||
add_subdirectory(source)
|
||||
add_subdirectory(tools)
|
||||
add_subdirectory(tests)
|
||||
#add_subdirectory(tests)
|
||||
add_subdirectory(example)
|
||||
|
||||
# docs
|
||||
|
|
|
@ -113,10 +113,10 @@ typedef struct SMqConsumeCbParam {
|
|||
} SMqConsumeCbParam;
|
||||
|
||||
typedef struct SMqCommitCbParam {
|
||||
tmq_t* tmq;
|
||||
SMqClientVg* pVg;
|
||||
int32_t async;
|
||||
tsem_t rspSem;
|
||||
tmq_t* tmq;
|
||||
SMqClientVg* pVg;
|
||||
int32_t async;
|
||||
tsem_t rspSem;
|
||||
} SMqCommitCbParam;
|
||||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
|
@ -163,8 +163,8 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
}
|
||||
|
||||
int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*) param;
|
||||
tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
|
||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
||||
tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL;
|
||||
if (pParam->tmq->commit_cb) {
|
||||
pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL);
|
||||
}
|
||||
|
@ -221,13 +221,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
tNameExtractFullName(&name, topicFname);
|
||||
tscDebug("subscribe topic: %s", topicFname);
|
||||
SMqClientTopic topic = {
|
||||
.nextVgIdx = 0,
|
||||
.sql = NULL,
|
||||
.sqlLen = 0,
|
||||
.topicId = 0,
|
||||
.topicName = topicFname,
|
||||
.vgs = NULL
|
||||
};
|
||||
.nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL};
|
||||
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
|
||||
taosArrayPush(tmq->clientTopics, &topic);
|
||||
/*SMqClientTopic topic = {*/
|
||||
|
@ -461,10 +455,10 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
void tmqShowMsg(tmq_message_t* tmq_message) {
|
||||
if (tmq_message == NULL) return;
|
||||
|
||||
static bool noPrintSchema;
|
||||
char pBuf[128];
|
||||
static bool noPrintSchema;
|
||||
char pBuf[128];
|
||||
SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message;
|
||||
int32_t colNum = pRsp->schemas->nCols;
|
||||
int32_t colNum = pRsp->schemas->nCols;
|
||||
if (!noPrintSchema) {
|
||||
printf("|");
|
||||
for (int32_t i = 0; i < colNum; i++) {
|
||||
|
@ -584,7 +578,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool wait) {
|
||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||
SMqCMGetSubEpReq* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
|
@ -603,7 +597,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) {
|
|||
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen};
|
||||
|
||||
SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam));
|
||||
SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam));
|
||||
if (pParam == NULL) {
|
||||
tscError("failed to malloc subscribe param");
|
||||
goto END;
|
||||
|
@ -653,7 +647,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
tmq_message_t* tmq_message = NULL;
|
||||
|
||||
int64_t status = atomic_load_64(&tmq->status);
|
||||
tmqAsyncAskEp(tmq, status == 0);
|
||||
tmqAskEp(tmq, status == 0);
|
||||
|
||||
if (blocking_time < 0) blocking_time = 1;
|
||||
if (blocking_time > 1000) blocking_time = 1000;
|
||||
|
@ -672,7 +666,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
|
||||
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
||||
int32_t beginVgIdx = pTopic->nextVgIdx;
|
||||
while(1) {
|
||||
while (1) {
|
||||
pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx);
|
||||
/*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||
|
@ -737,21 +731,20 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
}
|
||||
|
||||
tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) {
|
||||
|
||||
if (tmq_topic_vgroup_list != NULL) {
|
||||
//TODO
|
||||
// TODO
|
||||
}
|
||||
|
||||
//TODO: change semaphore to gate
|
||||
// TODO: change semaphore to gate
|
||||
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg);
|
||||
|
||||
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
|
||||
pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)};
|
||||
SMqCommitCbParam *pParam = malloc(sizeof(SMqCommitCbParam));
|
||||
SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam));
|
||||
if (pParam == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
@ -782,9 +775,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) {
|
|||
free(tmq_message);
|
||||
}
|
||||
|
||||
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) {
|
||||
return TMQ_RESP_ERR__SUCCESS;
|
||||
}
|
||||
tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; }
|
||||
|
||||
const char* tmq_err2str(tmq_resp_err_t err) {
|
||||
if (err == TMQ_RESP_ERR__SUCCESS) {
|
||||
|
|
|
@ -35,155 +35,14 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TQ_BUFFER_SIZE 8
|
||||
|
||||
typedef struct STqRspHandle {
|
||||
void* handle;
|
||||
void* ahandle;
|
||||
} STqRspHandle;
|
||||
|
||||
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||
|
||||
typedef struct STqTaskItem {
|
||||
int8_t status;
|
||||
int64_t offset;
|
||||
void* dst;
|
||||
qTaskInfo_t task;
|
||||
STqReadHandle* pReadHandle;
|
||||
SSubQueryMsg* pQueryMsg;
|
||||
} STqTaskItem;
|
||||
|
||||
// new version
|
||||
typedef struct STqBuffer {
|
||||
int64_t firstOffset;
|
||||
int64_t lastOffset;
|
||||
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||
} STqBuffer;
|
||||
|
||||
typedef struct STqTopicHandle {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
STqBuffer buffer;
|
||||
SWalReadHandle* pReadhandle;
|
||||
} STqTopicHandle;
|
||||
|
||||
typedef struct STqConsumerHandle {
|
||||
int64_t consumerId;
|
||||
int64_t epoch;
|
||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* topics; // SArray<STqClientTopic>
|
||||
} STqConsumerHandle;
|
||||
typedef struct STQ STQ;
|
||||
|
||||
// memory allocator supported by vnode
|
||||
typedef struct STqMemRef {
|
||||
SMemAllocatorFactory* pAllocatorFactory;
|
||||
SMemAllocator* pAllocator;
|
||||
} STqMemRef;
|
||||
|
||||
typedef struct STqSerializedHead {
|
||||
int16_t ver;
|
||||
int16_t action;
|
||||
int32_t checksum;
|
||||
int64_t ssize;
|
||||
char content[];
|
||||
} STqSerializedHead;
|
||||
|
||||
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
|
||||
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
|
||||
typedef void (*FTqDelete)(void*);
|
||||
|
||||
#define TQ_BUCKET_MASK 0xFF
|
||||
#define TQ_BUCKET_SIZE 256
|
||||
|
||||
#define TQ_PAGE_SIZE 4096
|
||||
// key + offset + size
|
||||
#define TQ_IDX_SIZE 24
|
||||
// 4096 / 24
|
||||
#define TQ_MAX_IDX_ONE_PAGE 170
|
||||
// 24 * 170
|
||||
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
||||
// 4096 - 4080
|
||||
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
||||
|
||||
#define TQ_ACTION_CONST 0
|
||||
#define TQ_ACTION_INUSE 1
|
||||
#define TQ_ACTION_INUSE_CONT 2
|
||||
#define TQ_ACTION_INTXN 3
|
||||
|
||||
#define TQ_SVER 0
|
||||
|
||||
// TODO: inplace mode is not implemented
|
||||
#define TQ_UPDATE_INPLACE 0
|
||||
#define TQ_UPDATE_APPEND 1
|
||||
|
||||
#define TQ_DUP_INTXN_REWRITE 0
|
||||
#define TQ_DUP_INTXN_REJECT 2
|
||||
|
||||
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
|
||||
|
||||
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
|
||||
|
||||
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
||||
|
||||
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
||||
|
||||
typedef struct STqMetaHandle {
|
||||
int64_t key;
|
||||
int64_t offset;
|
||||
int64_t serializedSize;
|
||||
void* valueInUse;
|
||||
void* valueInTxn;
|
||||
} STqMetaHandle;
|
||||
|
||||
typedef struct STqMetaList {
|
||||
STqMetaHandle handle;
|
||||
struct STqMetaList* next;
|
||||
// struct STqMetaList* inTxnPrev;
|
||||
// struct STqMetaList* inTxnNext;
|
||||
struct STqMetaList* unpersistPrev;
|
||||
struct STqMetaList* unpersistNext;
|
||||
} STqMetaList;
|
||||
|
||||
typedef struct STqMetaStore {
|
||||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||
// a table head
|
||||
STqMetaList* unpersistHead;
|
||||
// topics that are not connectted
|
||||
STqMetaList* unconnectTopic;
|
||||
|
||||
// TODO:temporaral use, to be replaced by unified tfile
|
||||
int fileFd;
|
||||
// TODO:temporaral use, to be replaced by unified tfile
|
||||
int idxFd;
|
||||
|
||||
char* dirPath;
|
||||
int32_t tqConfigFlag;
|
||||
FTqSerialize pSerializer;
|
||||
FTqDeserialize pDeserializer;
|
||||
FTqDelete pDeleter;
|
||||
} STqMetaStore;
|
||||
|
||||
typedef struct STQ {
|
||||
// the collection of groups
|
||||
// the handle of meta kvstore
|
||||
char* path;
|
||||
STqCfg* tqConfig;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
SWal* pWal;
|
||||
SMeta* pMeta;
|
||||
} STQ;
|
||||
|
||||
typedef struct STqMgmt {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
|
||||
// init once
|
||||
int tqInit();
|
||||
void tqCleanUp();
|
||||
|
|
|
@ -63,8 +63,145 @@ extern int32_t tqDebugFlag;
|
|||
} \
|
||||
}
|
||||
|
||||
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||
#define TQ_BUFFER_SIZE 8
|
||||
|
||||
#define TQ_BUCKET_MASK 0xFF
|
||||
#define TQ_BUCKET_SIZE 256
|
||||
|
||||
#define TQ_PAGE_SIZE 4096
|
||||
// key + offset + size
|
||||
#define TQ_IDX_SIZE 24
|
||||
// 4096 / 24
|
||||
#define TQ_MAX_IDX_ONE_PAGE 170
|
||||
// 24 * 170
|
||||
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
||||
// 4096 - 4080
|
||||
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
||||
|
||||
#define TQ_ACTION_CONST 0
|
||||
#define TQ_ACTION_INUSE 1
|
||||
#define TQ_ACTION_INUSE_CONT 2
|
||||
#define TQ_ACTION_INTXN 3
|
||||
|
||||
#define TQ_SVER 0
|
||||
|
||||
// TODO: inplace mode is not implemented
|
||||
#define TQ_UPDATE_INPLACE 0
|
||||
#define TQ_UPDATE_APPEND 1
|
||||
|
||||
#define TQ_DUP_INTXN_REWRITE 0
|
||||
#define TQ_DUP_INTXN_REJECT 2
|
||||
|
||||
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
|
||||
|
||||
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
|
||||
|
||||
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
||||
|
||||
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
||||
|
||||
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||
|
||||
typedef struct {
|
||||
int16_t ver;
|
||||
int16_t action;
|
||||
int32_t checksum;
|
||||
int64_t ssize;
|
||||
char content[];
|
||||
} STqSerializedHead;
|
||||
|
||||
typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
|
||||
typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
|
||||
typedef void (*FTqDelete)(void*);
|
||||
typedef struct STqMetaHandle {
|
||||
int64_t key;
|
||||
int64_t offset;
|
||||
int64_t serializedSize;
|
||||
void* valueInUse;
|
||||
void* valueInTxn;
|
||||
} STqMetaHandle;
|
||||
|
||||
typedef struct STqMetaList {
|
||||
STqMetaHandle handle;
|
||||
struct STqMetaList* next;
|
||||
// struct STqMetaList* inTxnPrev;
|
||||
// struct STqMetaList* inTxnNext;
|
||||
struct STqMetaList* unpersistPrev;
|
||||
struct STqMetaList* unpersistNext;
|
||||
} STqMetaList;
|
||||
|
||||
typedef struct {
|
||||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||
// a table head
|
||||
STqMetaList* unpersistHead;
|
||||
// topics that are not connectted
|
||||
STqMetaList* unconnectTopic;
|
||||
|
||||
// TODO:temporaral use, to be replaced by unified tfile
|
||||
int fileFd;
|
||||
// TODO:temporaral use, to be replaced by unified tfile
|
||||
int idxFd;
|
||||
|
||||
char* dirPath;
|
||||
int32_t tqConfigFlag;
|
||||
FTqSerialize pSerializer;
|
||||
FTqDeserialize pDeserializer;
|
||||
FTqDelete pDeleter;
|
||||
} STqMetaStore;
|
||||
|
||||
struct STQ {
|
||||
// the collection of groups
|
||||
// the handle of meta kvstore
|
||||
char* path;
|
||||
STqCfg* tqConfig;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
SWal* pWal;
|
||||
SMeta* pMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
|
||||
typedef struct {
|
||||
int8_t status;
|
||||
int64_t offset;
|
||||
qTaskInfo_t task;
|
||||
STqReadHandle* pReadHandle;
|
||||
} STqTaskItem;
|
||||
|
||||
// new version
|
||||
typedef struct {
|
||||
int64_t firstOffset;
|
||||
int64_t lastOffset;
|
||||
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||
} STqBuffer;
|
||||
|
||||
typedef struct {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
char* qmsg;
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
STqBuffer buffer;
|
||||
SWalReadHandle* pReadhandle;
|
||||
} STqTopicHandle;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int64_t epoch;
|
||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* topics; // SArray<STqClientTopic>
|
||||
} STqConsumerHandle;
|
||||
|
||||
int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**);
|
||||
const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**);
|
||||
|
||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
|
|
|
@ -322,6 +322,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
|||
pTopic->sql = req.sql;
|
||||
pTopic->logicalPlan = req.logicalPlan;
|
||||
pTopic->physicalPlan = req.physicalPlan;
|
||||
pTopic->qmsg = req.qmsg;
|
||||
pTopic->committedOffset = -1;
|
||||
pTopic->currentOffset = -1;
|
||||
|
||||
|
|
|
@ -289,7 +289,6 @@ int32_t tqStoreDelete(STqMetaStore* pMeta) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// TODO: wrap in tfile
|
||||
int32_t tqStorePersist(STqMetaStore* pMeta) {
|
||||
STqIdxPageBuf idxBuf;
|
||||
int64_t* bufPtr = (int64_t*)idxBuf.buffer;
|
||||
|
|
Loading…
Reference in New Issue