diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h new file mode 100644 index 0000000000..344ad992f0 --- /dev/null +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -0,0 +1,216 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_TQ_INT_H_ +#define _TD_TQ_INT_H_ + +#include "meta.h" +#include "tlog.h" +#include "tq.h" +#include "trpc.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern int32_t tqDebugFlag; + +#define tqFatal(...) \ + { \ + if (tqDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("TQ FATAL ", 255, __VA_ARGS__); \ + } \ + } +#define tqError(...) \ + { \ + if (tqDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("TQ ERROR ", 255, __VA_ARGS__); \ + } \ + } +#define tqWarn(...) \ + { \ + if (tqDebugFlag & DEBUG_WARN) { \ + taosPrintLog("TQ WARN ", 255, __VA_ARGS__); \ + } \ + } +#define tqInfo(...) \ + { \ + if (tqDebugFlag & DEBUG_INFO) { \ + taosPrintLog("TQ ", 255, __VA_ARGS__); \ + } \ + } +#define tqDebug(...) \ + { \ + if (tqDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \ + } \ + } +#define tqTrace(...) \ + { \ + if (tqDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("TQ ", tqDebugFlag, __VA_ARGS__); \ + } \ + } + +#define TQ_BUFFER_SIZE 8 + +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + +#define TQ_PAGE_SIZE 4096 +// key + offset + size +#define TQ_IDX_SIZE 24 +// 4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +// 24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +// 4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_INUSE_CONT 2 +#define TQ_ACTION_INTXN 3 + +#define TQ_SVER 0 + +// TODO: inplace mode is not implemented +#define TQ_UPDATE_INPLACE 0 +#define TQ_UPDATE_APPEND 1 + +#define TQ_DUP_INTXN_REWRITE 0 +#define TQ_DUP_INTXN_REJECT 2 + +static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; } + +static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; } + +static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; + +#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE + +typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; + +typedef struct { + int16_t ver; + int16_t action; + int32_t checksum; + int64_t ssize; + char content[]; +} STqSerializedHead; + +typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead); +typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj); +typedef void (*FTqDelete)(void*); + +typedef struct { + int64_t key; + int64_t offset; + int64_t serializedSize; + void* valueInUse; + void* valueInTxn; +} STqMetaHandle; + +typedef struct STqMetaList { + STqMetaHandle handle; + struct STqMetaList* next; + // struct STqMetaList* inTxnPrev; + // struct STqMetaList* inTxnNext; + struct STqMetaList* unpersistPrev; + struct STqMetaList* unpersistNext; +} STqMetaList; + +typedef struct { + STQ* pTq; + STqMetaList* bucket[TQ_BUCKET_SIZE]; + // a table head + STqMetaList* unpersistHead; + // topics that are not connectted + STqMetaList* unconnectTopic; + + // TODO:temporaral use, to be replaced by unified tfile + int fileFd; + // TODO:temporaral use, to be replaced by unified tfile + int idxFd; + + char* dirPath; + int32_t tqConfigFlag; + FTqSerialize pSerializer; + FTqDeserialize pDeserializer; + FTqDelete pDeleter; +} STqMetaStore; + +struct STQ { + // the collection of groups + // the handle of meta kvstore + char* path; + STqCfg* tqConfig; + STqMemRef tqMemRef; + STqMetaStore* tqMeta; + SWal* pWal; + SMeta* pMeta; +}; + +typedef struct { + int8_t inited; + tmr_h timer; +} STqMgmt; + +static STqMgmt tqMgmt; + +typedef struct { + int8_t status; + int64_t offset; + qTaskInfo_t task; + STqReadHandle* pReadHandle; +} STqTaskItem; + +// new version +typedef struct { + int64_t firstOffset; + int64_t lastOffset; + STqTaskItem output[TQ_BUFFER_SIZE]; +} STqBuffer; + +typedef struct { + char topicName[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + char* qmsg; + int64_t persistedOffset; + int64_t committedOffset; + int64_t currentOffset; + STqBuffer buffer; + SWalReadHandle* pReadhandle; +} STqTopic; + +typedef struct { + int64_t consumerId; + int64_t epoch; + char cgroup[TSDB_TOPIC_FNAME_LEN]; + SArray* topics; // SArray +} STqConsumer; + +int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); +int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); + +static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_TQ_INT_H_*/