diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 573fc78df0..bee443f487 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -20,6 +20,281 @@ extern "C" { #endif +// tqInt.h +#define tqFatal(...) \ + { \ + if (tqDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ + } \ + } + +#define tqError(...) \ + { \ + if (tqDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ + } \ + } + +#define tqWarn(...) \ + { \ + if (tqDebugFlag & DEBUG_WARN) { \ + taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ + } \ + } + +#define tqInfo(...) \ + { \ + if (tqDebugFlag & DEBUG_INFO) { \ + taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \ + } \ + } + +#define tqDebug(...) \ + { \ + if (tqDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \ + } \ + } + +#define tqTrace(...) \ + { \ + if (tqDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \ + } \ + } + +#define TQ_BUFFER_SIZE 4 + +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + +#define TQ_PAGE_SIZE 4096 +// key + offset + size +#define TQ_IDX_SIZE 24 +// 4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +// 24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +// 4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_INUSE_CONT 2 +#define TQ_ACTION_INTXN 3 + +#define TQ_SVER 0 + +// TODO: inplace mode is not implemented +#define TQ_UPDATE_INPLACE 0 +#define TQ_UPDATE_APPEND 1 + +#define TQ_DUP_INTXN_REWRITE 0 +#define TQ_DUP_INTXN_REJECT 2 + +static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } + +static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } + +static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; + +#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE + +typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; + +typedef struct STqOffsetCfg STqOffsetCfg; +typedef struct STqOffsetStore STqOffsetStore; + +typedef struct { + int16_t ver; + int16_t action; + int32_t checksum; + int64_t ssize; + char content[]; +} STqSerializedHead; + +typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); +typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj); +typedef void (*FTqDelete)(void*); + +typedef struct { + int64_t key; + int64_t offset; + int64_t serializedSize; + void* valueInUse; + void* valueInTxn; +} STqMetaHandle; + +typedef struct STqMetaList { + STqMetaHandle handle; + struct STqMetaList* next; + // struct STqMetaList* inTxnPrev; + // struct STqMetaList* inTxnNext; + struct STqMetaList* unpersistPrev; + struct STqMetaList* unpersistNext; +} STqMetaList; + +typedef struct { + STQ* pTq; + STqMetaList* bucket[TQ_BUCKET_SIZE]; + // a table head + STqMetaList* unpersistHead; + // topics that are not connectted + STqMetaList* unconnectTopic; + + TdFilePtr pFile; + TdFilePtr pIdxFile; + + char* dirPath; + int32_t tqConfigFlag; + FTqSerialize pSerializer; + FTqDeserialize pDeserializer; + FTqDelete pDeleter; +} STqMetaStore; + +typedef struct { + SMemAllocatorFactory* pAllocatorFactory; + SMemAllocator* pAllocator; +} STqMemRef; + +struct STQ { + // the collection of groups + // the handle of meta kvstore + bool writeTrigger; + char* path; + STqCfg* tqConfig; + STqMemRef tqMemRef; + STqMetaStore* tqMeta; + // STqPushMgr* tqPushMgr; + SHashObj* pStreamTasks; + SVnode* pVnode; + SWal* pWal; + SMeta* pVnodeMeta; +}; + +typedef struct { + int8_t inited; + tmr_h timer; +} STqMgmt; + +static STqMgmt tqMgmt; + +typedef struct { + int8_t status; + int64_t offset; + qTaskInfo_t task; + STqReadHandle* pReadHandle; +} STqTaskItem; + +// new version +typedef struct { + int64_t firstOffset; + int64_t lastOffset; + STqTaskItem output[TQ_BUFFER_SIZE]; +} STqBuffer; + +typedef struct { + char topicName[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + char* qmsg; + STqBuffer buffer; + SWalReadHandle* pReadhandle; +} STqTopic; + +typedef struct { + int64_t consumerId; + int32_t epoch; + char cgroup[TSDB_TOPIC_FNAME_LEN]; + SArray* topics; // SArray +} STqConsumer; + +enum { + TQ_PUSHER_TYPE__CLIENT = 1, + TQ_PUSHER_TYPE__STREAM, +}; + +typedef struct { + int8_t type; + int8_t reserved[3]; + int32_t ttl; + int64_t consumerId; + SRpcMsg* pMsg; + // SMqPollRsp* rsp; +} STqClientPusher; + +typedef struct { + int8_t type; + int8_t nodeType; + int8_t reserved[6]; + int64_t streamId; + qTaskInfo_t task; + // TODO sync function +} STqStreamPusher; + +typedef struct { + int8_t type; // mq or stream +} STqPusher; + +typedef struct { + SHashObj* pHash; // +} STqPushMgr; + +typedef struct { + int8_t inited; + tmr_h timer; +} STqPushMgmt; + +static STqPushMgmt tqPushMgmt; + + +int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); +int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); + +static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } + +// tqMetaStore.h +STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, + FTqDelete pDeleter, int32_t tqConfigFlag); +int32_t tqStoreClose(STqMetaStore*); +// int32_t tqStoreDelete(TqMetaStore*); +// int32_t tqStoreCommitAll(TqMetaStore*); +int32_t tqStorePersist(STqMetaStore*); +// clean deleted idx and data from persistent file +int32_t tqStoreCompact(STqMetaStore*); + +void* tqHandleGet(STqMetaStore*, int64_t key); +// make it unpersist +void* tqHandleTouchGet(STqMetaStore*, int64_t key); +int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); +int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); +// delete committed kv pair +// notice that a delete action still needs to be committed +int32_t tqHandleDel(STqMetaStore*, int64_t key); +int32_t tqHandlePurge(STqMetaStore*, int64_t key); +int32_t tqHandleCommit(STqMetaStore*, int64_t key); +int32_t tqHandleAbort(STqMetaStore*, int64_t key); + +// tqOffset +STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); +void STqOffsetClose(STqOffsetStore*); + +int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); +int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); +int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); +int32_t tqOffsetPersistAll(STqOffsetStore* pStore); + +// tqPush +int32_t tqPushMgrInit(); +void tqPushMgrCleanUp(); + +STqPushMgr* tqPushMgrOpen(); +void tqPushMgrClose(STqPushMgr* pushMgr); + +STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl); +STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h deleted file mode 100644 index c71c102b06..0000000000 --- a/source/dnode/vnode/src/inc/tqInt.h +++ /dev/null @@ -1,223 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TQ_INT_H_ -#define _TD_TQ_INT_H_ - -#include "vnode.h" -#include "tlog.h" -#include "tqPush.h" -#include "vnodeInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define tqFatal(...) \ - { \ - if (tqDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ - } \ - } - -#define tqError(...) \ - { \ - if (tqDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ - } \ - } - -#define tqWarn(...) \ - { \ - if (tqDebugFlag & DEBUG_WARN) { \ - taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ - } \ - } - -#define tqInfo(...) \ - { \ - if (tqDebugFlag & DEBUG_INFO) { \ - taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \ - } \ - } - -#define tqDebug(...) \ - { \ - if (tqDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \ - } \ - } - -#define tqTrace(...) \ - { \ - if (tqDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \ - } \ - } - -#define TQ_BUFFER_SIZE 4 - -#define TQ_BUCKET_MASK 0xFF -#define TQ_BUCKET_SIZE 256 - -#define TQ_PAGE_SIZE 4096 -// key + offset + size -#define TQ_IDX_SIZE 24 -// 4096 / 24 -#define TQ_MAX_IDX_ONE_PAGE 170 -// 24 * 170 -#define TQ_IDX_PAGE_BODY_SIZE 4080 -// 4096 - 4080 -#define TQ_IDX_PAGE_HEAD_SIZE 16 - -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 -#define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 - -#define TQ_SVER 0 - -// TODO: inplace mode is not implemented -#define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 - -#define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 - -static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } - -static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } - -static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; - -#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE - -typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; - -typedef struct { - int16_t ver; - int16_t action; - int32_t checksum; - int64_t ssize; - char content[]; -} STqSerializedHead; - -typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); -typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj); -typedef void (*FTqDelete)(void*); - -typedef struct { - int64_t key; - int64_t offset; - int64_t serializedSize; - void* valueInUse; - void* valueInTxn; -} STqMetaHandle; - -typedef struct STqMetaList { - STqMetaHandle handle; - struct STqMetaList* next; - // struct STqMetaList* inTxnPrev; - // struct STqMetaList* inTxnNext; - struct STqMetaList* unpersistPrev; - struct STqMetaList* unpersistNext; -} STqMetaList; - -typedef struct { - STQ* pTq; - STqMetaList* bucket[TQ_BUCKET_SIZE]; - // a table head - STqMetaList* unpersistHead; - // topics that are not connectted - STqMetaList* unconnectTopic; - - TdFilePtr pFile; - TdFilePtr pIdxFile; - - char* dirPath; - int32_t tqConfigFlag; - FTqSerialize pSerializer; - FTqDeserialize pDeserializer; - FTqDelete pDeleter; -} STqMetaStore; - -typedef struct { - SMemAllocatorFactory* pAllocatorFactory; - SMemAllocator* pAllocator; -} STqMemRef; - -struct STQ { - // the collection of groups - // the handle of meta kvstore - bool writeTrigger; - char* path; - STqCfg* tqConfig; - STqMemRef tqMemRef; - STqMetaStore* tqMeta; - // STqPushMgr* tqPushMgr; - SHashObj* pStreamTasks; - SVnode* pVnode; - SWal* pWal; - SMeta* pVnodeMeta; -}; - -typedef struct { - int8_t inited; - tmr_h timer; -} STqMgmt; - -static STqMgmt tqMgmt; - -typedef struct { - int8_t status; - int64_t offset; - qTaskInfo_t task; - STqReadHandle* pReadHandle; -} STqTaskItem; - -// new version -typedef struct { - int64_t firstOffset; - int64_t lastOffset; - STqTaskItem output[TQ_BUFFER_SIZE]; -} STqBuffer; - -typedef struct { - char topicName[TSDB_TOPIC_FNAME_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - char* qmsg; - STqBuffer buffer; - SWalReadHandle* pReadhandle; -} STqTopic; - -typedef struct { - int64_t consumerId; - int32_t epoch; - char cgroup[TSDB_TOPIC_FNAME_LEN]; - SArray* topics; // SArray -} STqConsumer; - -int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); -int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); - -static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_TQ_INT_H_*/ diff --git a/source/dnode/vnode/src/inc/tqMetaStore.h b/source/dnode/vnode/src/inc/tqMetaStore.h deleted file mode 100644 index eb203b7117..0000000000 --- a/source/dnode/vnode/src/inc/tqMetaStore.h +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TQ_META_STORE_H_ -#define _TQ_META_STORE_H_ - -#include "os.h" -#include "tqInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer, - FTqDelete pDeleter, int32_t tqConfigFlag); -int32_t tqStoreClose(STqMetaStore*); -// int32_t tqStoreDelete(TqMetaStore*); -// int32_t tqStoreCommitAll(TqMetaStore*); -int32_t tqStorePersist(STqMetaStore*); -// clean deleted idx and data from persistent file -int32_t tqStoreCompact(STqMetaStore*); - -void* tqHandleGet(STqMetaStore*, int64_t key); -// make it unpersist -void* tqHandleTouchGet(STqMetaStore*, int64_t key); -int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value); -int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize); -// delete committed kv pair -// notice that a delete action still needs to be committed -int32_t tqHandleDel(STqMetaStore*, int64_t key); -int32_t tqHandlePurge(STqMetaStore*, int64_t key); -int32_t tqHandleCommit(STqMetaStore*, int64_t key); -int32_t tqHandleAbort(STqMetaStore*, int64_t key); - -#ifdef __cplusplus -} -#endif - -#endif /* ifndef _TQ_META_STORE_H_ */ diff --git a/source/dnode/vnode/src/inc/tqOffset.h b/source/dnode/vnode/src/inc/tqOffset.h deleted file mode 100644 index b58de26f68..0000000000 --- a/source/dnode/vnode/src/inc/tqOffset.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_TQ_OFFSET_H_ -#define _TD_TQ_OFFSET_H_ - -#include "tqInt.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct STqOffsetCfg STqOffsetCfg; -typedef struct STqOffsetStore STqOffsetStore; - -STqOffsetStore* STqOffsetOpen(STqOffsetCfg*); -void STqOffsetClose(STqOffsetStore*); - -int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset); -int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey); -int32_t tqOffsetPersistAll(STqOffsetStore* pStore); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_TQ_OFFSET_H_*/ diff --git a/source/dnode/vnode/src/inc/tqPush.h b/source/dnode/vnode/src/inc/tqPush.h deleted file mode 100644 index a6121c5dc1..0000000000 --- a/source/dnode/vnode/src/inc/tqPush.h +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TQ_PUSH_H_ -#define _TQ_PUSH_H_ - -#include "executor.h" -#include "thash.h" -#include "trpc.h" -#include "ttimer.h" -#include "vnode.h" - -#ifdef __cplusplus -extern "C" { -#endif - -enum { - TQ_PUSHER_TYPE__CLIENT = 1, - TQ_PUSHER_TYPE__STREAM, -}; - -typedef struct { - int8_t type; - int8_t reserved[3]; - int32_t ttl; - int64_t consumerId; - SRpcMsg* pMsg; - // SMqPollRsp* rsp; -} STqClientPusher; - -typedef struct { - int8_t type; - int8_t nodeType; - int8_t reserved[6]; - int64_t streamId; - qTaskInfo_t task; - // TODO sync function -} STqStreamPusher; - -typedef struct { - int8_t type; // mq or stream -} STqPusher; - -typedef struct { - SHashObj* pHash; // -} STqPushMgr; - -typedef struct { - int8_t inited; - tmr_h timer; -} STqPushMgmt; - -static STqPushMgmt tqPushMgmt; - -int32_t tqPushMgrInit(); -void tqPushMgrCleanUp(); - -STqPushMgr* tqPushMgrOpen(); -void tqPushMgrClose(STqPushMgr* pushMgr); - -STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl); -STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet); - -#ifdef __cplusplus -} -#endif - -#endif /*_TQ_PUSH_H_*/ diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2242f6803f..6e1f00f931 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -16,21 +16,23 @@ #ifndef _TD_VNODE_DEF_H_ #define _TD_VNODE_DEF_H_ -#include "tmallocator.h" +#include "executor.h" +#include "tchecksum.h" #include "tcoding.h" +#include "tcompression.h" #include "tdatablock.h" #include "tfs.h" +#include "tglobal.h" #include "tlist.h" #include "tlockfree.h" #include "tmacro.h" -#include "vnode.h" -#include "vnodeQuery.h" -#include "wal.h" +#include "tmallocator.h" #include "tskiplist.h" -#include "tchecksum.h" -#include "tglobal.h" #include "ttime.h" -#include "tcompression.h" +#include "ttimer.h" +#include "vnode.h" +#include "wal.h" +#include "qworker.h" #ifdef __cplusplus extern "C" { @@ -38,8 +40,9 @@ extern "C" { typedef struct STQ STQ; -typedef struct SVState SVState; -typedef struct SVBufPool SVBufPool; +typedef struct SVState SVState; +typedef struct SVBufPool SVBufPool; +typedef struct SQWorkerMgmt SQHandle; typedef struct SVnodeTask { TD_DLIST_NODE(SVnodeTask); @@ -99,7 +102,9 @@ struct SVnode { STfs* pTfs; }; -int vnodeScheduleTask(SVnodeTask* task); +int vnodeScheduleTask(SVnodeTask* task); +int vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); #define vFatal(...) \ do { \ diff --git a/source/dnode/vnode/src/inc/vnodeQuery.h b/source/dnode/vnode/src/inc/vnodeQuery.h deleted file mode 100644 index 7816b4eb46..0000000000 --- a/source/dnode/vnode/src/inc/vnodeQuery.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_READ_H_ -#define _TD_VNODE_READ_H_ - -#ifdef __cplusplus -extern "C" { -#endif -#include "qworker.h" -#include "vnode.h" - - -typedef struct SQWorkerMgmt SQHandle; - -int vnodeQueryOpen(SVnode *pVnode); -void vnodeQueryClose(SVnode *pVnode); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_READ_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 70d6dba36f..28d71fb842 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -15,9 +15,8 @@ #include "tcompare.h" #include "tdatablock.h" -#include "tqInt.h" -#include "tqMetaStore.h" #include "tstream.h" +#include "vnodeInt.h" int32_t tqInit() { return tqPushMgrInit(); } @@ -272,7 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, + pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); SMqPollRsp rsp = { /*.consumerId = consumerId,*/ @@ -296,10 +296,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } STqTopic* pTopic = NULL; - int sz = taosArrayGetSize(pConsumer->topics); + int sz = taosArrayGetSize(pConsumer->topics); for (int32_t i = 0; i < sz; i++) { STqTopic* topic = taosArrayGet(pConsumer->topics, i); - //TODO race condition + // TODO race condition ASSERT(pConsumer->consumerId == consumerId); if (strcmp(topic->topicName, pReq->topic) == 0) { pTopic = topic; @@ -307,7 +307,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } } if (pTopic == NULL) { - vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId); + vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, + pTq->pVnode->vgId); pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; @@ -322,10 +323,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { while (1) { /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ - //TODO + // TODO consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > pReq->epoch) { - //TODO: return + // TODO: return break; } SWalReadHead* pHead; @@ -333,10 +334,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset); break; } - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ if (pHead->msgType == TDMT_VND_SUBMIT) { @@ -360,7 +363,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset); + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, + pReq->epoch, pTq->pVnode->vgId, fetchOffset); fetchOffset++; rsp.skipLogNum++; taosArrayDestroy(pRes); @@ -389,7 +393,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch); + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); return 0; @@ -420,7 +425,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch); + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + pReq->epoch); /*}*/ return 0; @@ -431,7 +437,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) { terrno = TSDB_CODE_SUCCESS; tDecodeSMqMVRebReq(msg, &req); - vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId); + vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId); STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId); ASSERT(pConsumer); ASSERT(pConsumer->consumerId == req.oldConsumerId); diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index 505687755d..beb19f48f1 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "tqMetaStore.h" +#include "vnodeInt.h" // TODO:replace by an abstract file layer #include #include diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 20270950cd..3cff87340d 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "tqOffset.h" +#include "vnodeInt.h" enum ETqOffsetPersist { TQ_OFFSET_PERSIST__LAZY = 1, @@ -39,4 +39,3 @@ STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) { pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK); return pStore; } - diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 7851c071c3..4384255f89 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tqPush.h" +#include "vnodeInt.h" int32_t tqPushMgrInit() { // diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 0590bd4321..f56ded9f15 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "vnodeQuery.h" #include "executor.h" #include "vnodeInt.h"