diff --git a/CMakeLists.txt b/CMakeLists.txt index fd542966cc..f1be916b3b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,7 +27,7 @@ if(${BUILD_TEST}) endif(${BUILD_TEST}) add_subdirectory(source) add_subdirectory(tools) -add_subdirectory(tests) +#add_subdirectory(tests) add_subdirectory(example) # docs diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4adfa9eaf9..8b1faaef4e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -113,10 +113,10 @@ typedef struct SMqConsumeCbParam { } SMqConsumeCbParam; typedef struct SMqCommitCbParam { - tmq_t* tmq; - SMqClientVg* pVg; - int32_t async; - tsem_t rspSem; + tmq_t* tmq; + SMqClientVg* pVg; + int32_t async; + tsem_t rspSem; } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -163,8 +163,8 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { } int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { - SMqCommitCbParam* pParam = (SMqCommitCbParam*) param; - tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; + SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; + tmq_resp_err_t rspErr = code == 0 ? TMQ_RESP_ERR__SUCCESS : TMQ_RESP_ERR__FAIL; if (pParam->tmq->commit_cb) { pParam->tmq->commit_cb(pParam->tmq, rspErr, NULL, NULL); } @@ -221,13 +221,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tNameExtractFullName(&name, topicFname); tscDebug("subscribe topic: %s", topicFname); SMqClientTopic topic = { - .nextVgIdx = 0, - .sql = NULL, - .sqlLen = 0, - .topicId = 0, - .topicName = topicFname, - .vgs = NULL - }; + .nextVgIdx = 0, .sql = NULL, .sqlLen = 0, .topicId = 0, .topicName = topicFname, .vgs = NULL}; topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); taosArrayPush(tmq->clientTopics, &topic); /*SMqClientTopic topic = {*/ @@ -461,10 +455,10 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { void tmqShowMsg(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - static bool noPrintSchema; - char pBuf[128]; + static bool noPrintSchema; + char pBuf[128]; SMqConsumeRsp* pRsp = (SMqConsumeRsp*)tmq_message; - int32_t colNum = pRsp->schemas->nCols; + int32_t colNum = pRsp->schemas->nCols; if (!noPrintSchema) { printf("|"); for (int32_t i = 0; i < colNum; i++) { @@ -584,7 +578,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { +int32_t tmqAskEp(tmq_t* tmq, bool wait) { int32_t tlen = sizeof(SMqCMGetSubEpReq); SMqCMGetSubEpReq* buf = malloc(tlen); if (buf == NULL) { @@ -603,7 +597,7 @@ int32_t tmqAsyncAskEp(tmq_t* tmq, bool wait) { pRequest->body.requestMsg = (SDataBuf){.pData = buf, .len = tlen}; - SMqAskEpCbParam *pParam = malloc(sizeof(SMqAskEpCbParam)); + SMqAskEpCbParam* pParam = malloc(sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("failed to malloc subscribe param"); goto END; @@ -653,7 +647,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_message = NULL; int64_t status = atomic_load_64(&tmq->status); - tmqAsyncAskEp(tmq, status == 0); + tmqAskEp(tmq, status == 0); if (blocking_time < 0) blocking_time = 1; if (blocking_time > 1000) blocking_time = 1000; @@ -672,7 +666,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); int32_t beginVgIdx = pTopic->nextVgIdx; - while(1) { + while (1) { pTopic->nextVgIdx = (pTopic->nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, pTopic->nextVgIdx); /*printf("consume vg %d, offset %ld\n", pVg->vgId, pVg->currentOffset);*/ @@ -737,21 +731,20 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_vgroup_list, int32_t async) { - if (tmq_topic_vgroup_list != NULL) { - //TODO + // TODO } - //TODO: change semaphore to gate + // TODO: change semaphore to gate for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, 0, TMQ_REQ_TYPE_COMMIT_ONLY, pTopic, pVg); - + SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME); pRequest->body.requestMsg = (SDataBuf){.pData = pReq, .len = sizeof(SMqConsumeReq)}; - SMqCommitCbParam *pParam = malloc(sizeof(SMqCommitCbParam)); + SMqCommitCbParam* pParam = malloc(sizeof(SMqCommitCbParam)); if (pParam == NULL) { continue; } @@ -782,9 +775,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { free(tmq_message); } -tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { - return TMQ_RESP_ERR__SUCCESS; -} +tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } const char* tmq_err2str(tmq_resp_err_t err) { if (err == TMQ_RESP_ERR__SUCCESS) { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index bd626154e6..df755574bf 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -35,155 +35,14 @@ extern "C" { #endif -#define TQ_BUFFER_SIZE 8 - -typedef struct STqRspHandle { - void* handle; - void* ahandle; -} STqRspHandle; - -typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; - -typedef struct STqTaskItem { - int8_t status; - int64_t offset; - void* dst; - qTaskInfo_t task; - STqReadHandle* pReadHandle; - SSubQueryMsg* pQueryMsg; -} STqTaskItem; - -// new version -typedef struct STqBuffer { - int64_t firstOffset; - int64_t lastOffset; - STqTaskItem output[TQ_BUFFER_SIZE]; -} STqBuffer; - -typedef struct STqTopicHandle { - char topicName[TSDB_TOPIC_FNAME_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - int64_t committedOffset; - int64_t currentOffset; - STqBuffer buffer; - SWalReadHandle* pReadhandle; -} STqTopicHandle; - -typedef struct STqConsumerHandle { - int64_t consumerId; - int64_t epoch; - char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray -} STqConsumerHandle; +typedef struct STQ STQ; +// memory allocator supported by vnode typedef struct STqMemRef { SMemAllocatorFactory* pAllocatorFactory; SMemAllocator* pAllocator; } STqMemRef; -typedef struct STqSerializedHead { - int16_t ver; - int16_t action; - int32_t checksum; - int64_t ssize; - char content[]; -} STqSerializedHead; - -typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); -typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj); -typedef void (*FTqDelete)(void*); - -#define TQ_BUCKET_MASK 0xFF -#define TQ_BUCKET_SIZE 256 - -#define TQ_PAGE_SIZE 4096 -// key + offset + size -#define TQ_IDX_SIZE 24 -// 4096 / 24 -#define TQ_MAX_IDX_ONE_PAGE 170 -// 24 * 170 -#define TQ_IDX_PAGE_BODY_SIZE 4080 -// 4096 - 4080 -#define TQ_IDX_PAGE_HEAD_SIZE 16 - -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 -#define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 - -#define TQ_SVER 0 - -// TODO: inplace mode is not implemented -#define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 - -#define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 - -static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } - -static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } - -static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; - -#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE - -typedef struct STqMetaHandle { - int64_t key; - int64_t offset; - int64_t serializedSize; - void* valueInUse; - void* valueInTxn; -} STqMetaHandle; - -typedef struct STqMetaList { - STqMetaHandle handle; - struct STqMetaList* next; - // struct STqMetaList* inTxnPrev; - // struct STqMetaList* inTxnNext; - struct STqMetaList* unpersistPrev; - struct STqMetaList* unpersistNext; -} STqMetaList; - -typedef struct STqMetaStore { - STqMetaList* bucket[TQ_BUCKET_SIZE]; - // a table head - STqMetaList* unpersistHead; - // topics that are not connectted - STqMetaList* unconnectTopic; - - // TODO:temporaral use, to be replaced by unified tfile - int fileFd; - // TODO:temporaral use, to be replaced by unified tfile - int idxFd; - - char* dirPath; - int32_t tqConfigFlag; - FTqSerialize pSerializer; - FTqDeserialize pDeserializer; - FTqDelete pDeleter; -} STqMetaStore; - -typedef struct STQ { - // the collection of groups - // the handle of meta kvstore - char* path; - STqCfg* tqConfig; - STqMemRef tqMemRef; - STqMetaStore* tqMeta; - SWal* pWal; - SMeta* pMeta; -} STQ; - -typedef struct STqMgmt { - int8_t inited; - tmr_h timer; -} STqMgmt; - -static STqMgmt tqMgmt; - // init once int tqInit(); void tqCleanUp(); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index cefe9eff72..13c0150cd2 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -63,8 +63,145 @@ extern int32_t tqDebugFlag; } \ } -int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**); -const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**); +#define TQ_BUFFER_SIZE 8 + +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + +#define TQ_PAGE_SIZE 4096 +// key + offset + size +#define TQ_IDX_SIZE 24 +// 4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +// 24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +// 4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_INUSE_CONT 2 +#define TQ_ACTION_INTXN 3 + +#define TQ_SVER 0 + +// TODO: inplace mode is not implemented +#define TQ_UPDATE_INPLACE 0 +#define TQ_UPDATE_APPEND 1 + +#define TQ_DUP_INTXN_REWRITE 0 +#define TQ_DUP_INTXN_REJECT 2 + +static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } + +static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } + +static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; + +#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE + +typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; + +typedef struct { + int16_t ver; + int16_t action; + int32_t checksum; + int64_t ssize; + char content[]; +} STqSerializedHead; + +typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); +typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj); +typedef void (*FTqDelete)(void*); +typedef struct STqMetaHandle { + int64_t key; + int64_t offset; + int64_t serializedSize; + void* valueInUse; + void* valueInTxn; +} STqMetaHandle; + +typedef struct STqMetaList { + STqMetaHandle handle; + struct STqMetaList* next; + // struct STqMetaList* inTxnPrev; + // struct STqMetaList* inTxnNext; + struct STqMetaList* unpersistPrev; + struct STqMetaList* unpersistNext; +} STqMetaList; + +typedef struct { + STqMetaList* bucket[TQ_BUCKET_SIZE]; + // a table head + STqMetaList* unpersistHead; + // topics that are not connectted + STqMetaList* unconnectTopic; + + // TODO:temporaral use, to be replaced by unified tfile + int fileFd; + // TODO:temporaral use, to be replaced by unified tfile + int idxFd; + + char* dirPath; + int32_t tqConfigFlag; + FTqSerialize pSerializer; + FTqDeserialize pDeserializer; + FTqDelete pDeleter; +} STqMetaStore; + +struct STQ { + // the collection of groups + // the handle of meta kvstore + char* path; + STqCfg* tqConfig; + STqMemRef tqMemRef; + STqMetaStore* tqMeta; + SWal* pWal; + SMeta* pMeta; +}; + +typedef struct { + int8_t inited; + tmr_h timer; +} STqMgmt; + +static STqMgmt tqMgmt; + +typedef struct { + int8_t status; + int64_t offset; + qTaskInfo_t task; + STqReadHandle* pReadHandle; +} STqTaskItem; + +// new version +typedef struct { + int64_t firstOffset; + int64_t lastOffset; + STqTaskItem output[TQ_BUFFER_SIZE]; +} STqBuffer; + +typedef struct { + char topicName[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + char* qmsg; + int64_t committedOffset; + int64_t currentOffset; + STqBuffer buffer; + SWalReadHandle* pReadhandle; +} STqTopicHandle; + +typedef struct { + int64_t consumerId; + int64_t epoch; + char cgroup[TSDB_TOPIC_FNAME_LEN]; + SArray* topics; // SArray +} STqConsumerHandle; + +int tqSerializeConsumer(const STqConsumerHandle*, STqSerializedHead**); +const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHandle**); static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6af1973901..8ffedfebe4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -322,6 +322,7 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) { pTopic->sql = req.sql; pTopic->logicalPlan = req.logicalPlan; pTopic->physicalPlan = req.physicalPlan; + pTopic->qmsg = req.qmsg; pTopic->committedOffset = -1; pTopic->currentOffset = -1; diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index d220966ba6..121be98572 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -289,7 +289,6 @@ int32_t tqStoreDelete(STqMetaStore* pMeta) { return 0; } -// TODO: wrap in tfile int32_t tqStorePersist(STqMetaStore* pMeta) { STqIdxPageBuf idxBuf; int64_t* bufPtr = (int64_t*)idxBuf.buffer;