refact more
This commit is contained in:
parent
94bcbf6bac
commit
742b2918ff
|
@ -20,6 +20,281 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
// tqInt.h
|
||||
#define tqFatal(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqError(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqWarn(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqInfo(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqDebug(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqTrace(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define TQ_BUFFER_SIZE 4
|
||||
|
||||
#define TQ_BUCKET_MASK 0xFF
|
||||
#define TQ_BUCKET_SIZE 256
|
||||
|
||||
#define TQ_PAGE_SIZE 4096
|
||||
// key + offset + size
|
||||
#define TQ_IDX_SIZE 24
|
||||
// 4096 / 24
|
||||
#define TQ_MAX_IDX_ONE_PAGE 170
|
||||
// 24 * 170
|
||||
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
||||
// 4096 - 4080
|
||||
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
||||
|
||||
#define TQ_ACTION_CONST 0
|
||||
#define TQ_ACTION_INUSE 1
|
||||
#define TQ_ACTION_INUSE_CONT 2
|
||||
#define TQ_ACTION_INTXN 3
|
||||
|
||||
#define TQ_SVER 0
|
||||
|
||||
// TODO: inplace mode is not implemented
|
||||
#define TQ_UPDATE_INPLACE 0
|
||||
#define TQ_UPDATE_APPEND 1
|
||||
|
||||
#define TQ_DUP_INTXN_REWRITE 0
|
||||
#define TQ_DUP_INTXN_REJECT 2
|
||||
|
||||
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
|
||||
|
||||
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
|
||||
|
||||
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
||||
|
||||
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
||||
|
||||
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||
|
||||
typedef struct STqOffsetCfg STqOffsetCfg;
|
||||
typedef struct STqOffsetStore STqOffsetStore;
|
||||
|
||||
typedef struct {
|
||||
int16_t ver;
|
||||
int16_t action;
|
||||
int32_t checksum;
|
||||
int64_t ssize;
|
||||
char content[];
|
||||
} STqSerializedHead;
|
||||
|
||||
typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
|
||||
typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj);
|
||||
typedef void (*FTqDelete)(void*);
|
||||
|
||||
typedef struct {
|
||||
int64_t key;
|
||||
int64_t offset;
|
||||
int64_t serializedSize;
|
||||
void* valueInUse;
|
||||
void* valueInTxn;
|
||||
} STqMetaHandle;
|
||||
|
||||
typedef struct STqMetaList {
|
||||
STqMetaHandle handle;
|
||||
struct STqMetaList* next;
|
||||
// struct STqMetaList* inTxnPrev;
|
||||
// struct STqMetaList* inTxnNext;
|
||||
struct STqMetaList* unpersistPrev;
|
||||
struct STqMetaList* unpersistNext;
|
||||
} STqMetaList;
|
||||
|
||||
typedef struct {
|
||||
STQ* pTq;
|
||||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||
// a table head
|
||||
STqMetaList* unpersistHead;
|
||||
// topics that are not connectted
|
||||
STqMetaList* unconnectTopic;
|
||||
|
||||
TdFilePtr pFile;
|
||||
TdFilePtr pIdxFile;
|
||||
|
||||
char* dirPath;
|
||||
int32_t tqConfigFlag;
|
||||
FTqSerialize pSerializer;
|
||||
FTqDeserialize pDeserializer;
|
||||
FTqDelete pDeleter;
|
||||
} STqMetaStore;
|
||||
|
||||
typedef struct {
|
||||
SMemAllocatorFactory* pAllocatorFactory;
|
||||
SMemAllocator* pAllocator;
|
||||
} STqMemRef;
|
||||
|
||||
struct STQ {
|
||||
// the collection of groups
|
||||
// the handle of meta kvstore
|
||||
bool writeTrigger;
|
||||
char* path;
|
||||
STqCfg* tqConfig;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
// STqPushMgr* tqPushMgr;
|
||||
SHashObj* pStreamTasks;
|
||||
SVnode* pVnode;
|
||||
SWal* pWal;
|
||||
SMeta* pVnodeMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
|
||||
typedef struct {
|
||||
int8_t status;
|
||||
int64_t offset;
|
||||
qTaskInfo_t task;
|
||||
STqReadHandle* pReadHandle;
|
||||
} STqTaskItem;
|
||||
|
||||
// new version
|
||||
typedef struct {
|
||||
int64_t firstOffset;
|
||||
int64_t lastOffset;
|
||||
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||
} STqBuffer;
|
||||
|
||||
typedef struct {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
char* qmsg;
|
||||
STqBuffer buffer;
|
||||
SWalReadHandle* pReadhandle;
|
||||
} STqTopic;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* topics; // SArray<STqTopic>
|
||||
} STqConsumer;
|
||||
|
||||
enum {
|
||||
TQ_PUSHER_TYPE__CLIENT = 1,
|
||||
TQ_PUSHER_TYPE__STREAM,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t reserved[3];
|
||||
int32_t ttl;
|
||||
int64_t consumerId;
|
||||
SRpcMsg* pMsg;
|
||||
// SMqPollRsp* rsp;
|
||||
} STqClientPusher;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t nodeType;
|
||||
int8_t reserved[6];
|
||||
int64_t streamId;
|
||||
qTaskInfo_t task;
|
||||
// TODO sync function
|
||||
} STqStreamPusher;
|
||||
|
||||
typedef struct {
|
||||
int8_t type; // mq or stream
|
||||
} STqPusher;
|
||||
|
||||
typedef struct {
|
||||
SHashObj* pHash; // <id, STqPush*>
|
||||
} STqPushMgr;
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqPushMgmt;
|
||||
|
||||
static STqPushMgmt tqPushMgmt;
|
||||
|
||||
|
||||
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
|
||||
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
|
||||
|
||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
// tqMetaStore.h
|
||||
STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer,
|
||||
FTqDelete pDeleter, int32_t tqConfigFlag);
|
||||
int32_t tqStoreClose(STqMetaStore*);
|
||||
// int32_t tqStoreDelete(TqMetaStore*);
|
||||
// int32_t tqStoreCommitAll(TqMetaStore*);
|
||||
int32_t tqStorePersist(STqMetaStore*);
|
||||
// clean deleted idx and data from persistent file
|
||||
int32_t tqStoreCompact(STqMetaStore*);
|
||||
|
||||
void* tqHandleGet(STqMetaStore*, int64_t key);
|
||||
// make it unpersist
|
||||
void* tqHandleTouchGet(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value);
|
||||
int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
|
||||
// delete committed kv pair
|
||||
// notice that a delete action still needs to be committed
|
||||
int32_t tqHandleDel(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandlePurge(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleCommit(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleAbort(STqMetaStore*, int64_t key);
|
||||
|
||||
// tqOffset
|
||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
||||
void STqOffsetClose(STqOffsetStore*);
|
||||
|
||||
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
||||
|
||||
// tqPush
|
||||
int32_t tqPushMgrInit();
|
||||
void tqPushMgrCleanUp();
|
||||
|
||||
STqPushMgr* tqPushMgrOpen();
|
||||
void tqPushMgrClose(STqPushMgr* pushMgr);
|
||||
|
||||
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl);
|
||||
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1,223 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TQ_INT_H_
|
||||
#define _TD_TQ_INT_H_
|
||||
|
||||
#include "vnode.h"
|
||||
#include "tlog.h"
|
||||
#include "tqPush.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define tqFatal(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_FATAL) { \
|
||||
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqError(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_ERROR) { \
|
||||
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqWarn(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_WARN) { \
|
||||
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqInfo(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_INFO) { \
|
||||
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqDebug(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_DEBUG) { \
|
||||
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define tqTrace(...) \
|
||||
{ \
|
||||
if (tqDebugFlag & DEBUG_TRACE) { \
|
||||
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
|
||||
} \
|
||||
}
|
||||
|
||||
#define TQ_BUFFER_SIZE 4
|
||||
|
||||
#define TQ_BUCKET_MASK 0xFF
|
||||
#define TQ_BUCKET_SIZE 256
|
||||
|
||||
#define TQ_PAGE_SIZE 4096
|
||||
// key + offset + size
|
||||
#define TQ_IDX_SIZE 24
|
||||
// 4096 / 24
|
||||
#define TQ_MAX_IDX_ONE_PAGE 170
|
||||
// 24 * 170
|
||||
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
||||
// 4096 - 4080
|
||||
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
||||
|
||||
#define TQ_ACTION_CONST 0
|
||||
#define TQ_ACTION_INUSE 1
|
||||
#define TQ_ACTION_INUSE_CONT 2
|
||||
#define TQ_ACTION_INTXN 3
|
||||
|
||||
#define TQ_SVER 0
|
||||
|
||||
// TODO: inplace mode is not implemented
|
||||
#define TQ_UPDATE_INPLACE 0
|
||||
#define TQ_UPDATE_APPEND 1
|
||||
|
||||
#define TQ_DUP_INTXN_REWRITE 0
|
||||
#define TQ_DUP_INTXN_REJECT 2
|
||||
|
||||
static inline bool tqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
|
||||
|
||||
static inline bool tqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
|
||||
|
||||
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
||||
|
||||
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
||||
|
||||
typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
|
||||
|
||||
typedef struct {
|
||||
int16_t ver;
|
||||
int16_t action;
|
||||
int32_t checksum;
|
||||
int64_t ssize;
|
||||
char content[];
|
||||
} STqSerializedHead;
|
||||
|
||||
typedef int32_t (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
|
||||
typedef int32_t (*FTqDeserialize)(void* self, const STqSerializedHead* pHead, void** ppObj);
|
||||
typedef void (*FTqDelete)(void*);
|
||||
|
||||
typedef struct {
|
||||
int64_t key;
|
||||
int64_t offset;
|
||||
int64_t serializedSize;
|
||||
void* valueInUse;
|
||||
void* valueInTxn;
|
||||
} STqMetaHandle;
|
||||
|
||||
typedef struct STqMetaList {
|
||||
STqMetaHandle handle;
|
||||
struct STqMetaList* next;
|
||||
// struct STqMetaList* inTxnPrev;
|
||||
// struct STqMetaList* inTxnNext;
|
||||
struct STqMetaList* unpersistPrev;
|
||||
struct STqMetaList* unpersistNext;
|
||||
} STqMetaList;
|
||||
|
||||
typedef struct {
|
||||
STQ* pTq;
|
||||
STqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||
// a table head
|
||||
STqMetaList* unpersistHead;
|
||||
// topics that are not connectted
|
||||
STqMetaList* unconnectTopic;
|
||||
|
||||
TdFilePtr pFile;
|
||||
TdFilePtr pIdxFile;
|
||||
|
||||
char* dirPath;
|
||||
int32_t tqConfigFlag;
|
||||
FTqSerialize pSerializer;
|
||||
FTqDeserialize pDeserializer;
|
||||
FTqDelete pDeleter;
|
||||
} STqMetaStore;
|
||||
|
||||
typedef struct {
|
||||
SMemAllocatorFactory* pAllocatorFactory;
|
||||
SMemAllocator* pAllocator;
|
||||
} STqMemRef;
|
||||
|
||||
struct STQ {
|
||||
// the collection of groups
|
||||
// the handle of meta kvstore
|
||||
bool writeTrigger;
|
||||
char* path;
|
||||
STqCfg* tqConfig;
|
||||
STqMemRef tqMemRef;
|
||||
STqMetaStore* tqMeta;
|
||||
// STqPushMgr* tqPushMgr;
|
||||
SHashObj* pStreamTasks;
|
||||
SVnode* pVnode;
|
||||
SWal* pWal;
|
||||
SMeta* pVnodeMeta;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqMgmt;
|
||||
|
||||
static STqMgmt tqMgmt;
|
||||
|
||||
typedef struct {
|
||||
int8_t status;
|
||||
int64_t offset;
|
||||
qTaskInfo_t task;
|
||||
STqReadHandle* pReadHandle;
|
||||
} STqTaskItem;
|
||||
|
||||
// new version
|
||||
typedef struct {
|
||||
int64_t firstOffset;
|
||||
int64_t lastOffset;
|
||||
STqTaskItem output[TQ_BUFFER_SIZE];
|
||||
} STqBuffer;
|
||||
|
||||
typedef struct {
|
||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||
char* sql;
|
||||
char* logicalPlan;
|
||||
char* physicalPlan;
|
||||
char* qmsg;
|
||||
STqBuffer buffer;
|
||||
SWalReadHandle* pReadhandle;
|
||||
} STqTopic;
|
||||
|
||||
typedef struct {
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* topics; // SArray<STqTopic>
|
||||
} STqConsumer;
|
||||
|
||||
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
|
||||
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
|
||||
|
||||
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_TQ_INT_H_*/
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TQ_META_STORE_H_
|
||||
#define _TQ_META_STORE_H_
|
||||
|
||||
#include "os.h"
|
||||
#include "tqInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize pSerializer, FTqDeserialize pDeserializer,
|
||||
FTqDelete pDeleter, int32_t tqConfigFlag);
|
||||
int32_t tqStoreClose(STqMetaStore*);
|
||||
// int32_t tqStoreDelete(TqMetaStore*);
|
||||
// int32_t tqStoreCommitAll(TqMetaStore*);
|
||||
int32_t tqStorePersist(STqMetaStore*);
|
||||
// clean deleted idx and data from persistent file
|
||||
int32_t tqStoreCompact(STqMetaStore*);
|
||||
|
||||
void* tqHandleGet(STqMetaStore*, int64_t key);
|
||||
// make it unpersist
|
||||
void* tqHandleTouchGet(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleMovePut(STqMetaStore*, int64_t key, void* value);
|
||||
int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
|
||||
// delete committed kv pair
|
||||
// notice that a delete action still needs to be committed
|
||||
int32_t tqHandleDel(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandlePurge(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleCommit(STqMetaStore*, int64_t key);
|
||||
int32_t tqHandleAbort(STqMetaStore*, int64_t key);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* ifndef _TQ_META_STORE_H_ */
|
|
@ -1,40 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TQ_OFFSET_H_
|
||||
#define _TD_TQ_OFFSET_H_
|
||||
|
||||
#include "tqInt.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
typedef struct STqOffsetCfg STqOffsetCfg;
|
||||
typedef struct STqOffsetStore STqOffsetStore;
|
||||
|
||||
STqOffsetStore* STqOffsetOpen(STqOffsetCfg*);
|
||||
void STqOffsetClose(STqOffsetStore*);
|
||||
|
||||
int64_t tqOffsetFetch(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetCommit(STqOffsetStore* pStore, const char* subscribeKey, int64_t offset);
|
||||
int32_t tqOffsetPersist(STqOffsetStore* pStore, const char* subscribeKey);
|
||||
int32_t tqOffsetPersistAll(STqOffsetStore* pStore);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_TQ_OFFSET_H_*/
|
|
@ -1,80 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TQ_PUSH_H_
|
||||
#define _TQ_PUSH_H_
|
||||
|
||||
#include "executor.h"
|
||||
#include "thash.h"
|
||||
#include "trpc.h"
|
||||
#include "ttimer.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
enum {
|
||||
TQ_PUSHER_TYPE__CLIENT = 1,
|
||||
TQ_PUSHER_TYPE__STREAM,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t reserved[3];
|
||||
int32_t ttl;
|
||||
int64_t consumerId;
|
||||
SRpcMsg* pMsg;
|
||||
// SMqPollRsp* rsp;
|
||||
} STqClientPusher;
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
int8_t nodeType;
|
||||
int8_t reserved[6];
|
||||
int64_t streamId;
|
||||
qTaskInfo_t task;
|
||||
// TODO sync function
|
||||
} STqStreamPusher;
|
||||
|
||||
typedef struct {
|
||||
int8_t type; // mq or stream
|
||||
} STqPusher;
|
||||
|
||||
typedef struct {
|
||||
SHashObj* pHash; // <id, STqPush*>
|
||||
} STqPushMgr;
|
||||
|
||||
typedef struct {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
} STqPushMgmt;
|
||||
|
||||
static STqPushMgmt tqPushMgmt;
|
||||
|
||||
int32_t tqPushMgrInit();
|
||||
void tqPushMgrCleanUp();
|
||||
|
||||
STqPushMgr* tqPushMgrOpen();
|
||||
void tqPushMgrClose(STqPushMgr* pushMgr);
|
||||
|
||||
STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl);
|
||||
STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TQ_PUSH_H_*/
|
|
@ -16,21 +16,23 @@
|
|||
#ifndef _TD_VNODE_DEF_H_
|
||||
#define _TD_VNODE_DEF_H_
|
||||
|
||||
#include "tmallocator.h"
|
||||
#include "executor.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompression.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tfs.h"
|
||||
#include "tglobal.h"
|
||||
#include "tlist.h"
|
||||
#include "tlockfree.h"
|
||||
#include "tmacro.h"
|
||||
#include "vnode.h"
|
||||
#include "vnodeQuery.h"
|
||||
#include "wal.h"
|
||||
#include "tmallocator.h"
|
||||
#include "tskiplist.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
#include "tcompression.h"
|
||||
#include "ttimer.h"
|
||||
#include "vnode.h"
|
||||
#include "wal.h"
|
||||
#include "qworker.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -40,6 +42,7 @@ typedef struct STQ STQ;
|
|||
|
||||
typedef struct SVState SVState;
|
||||
typedef struct SVBufPool SVBufPool;
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
|
||||
typedef struct SVnodeTask {
|
||||
TD_DLIST_NODE(SVnodeTask);
|
||||
|
@ -100,6 +103,8 @@ struct SVnode {
|
|||
};
|
||||
|
||||
int vnodeScheduleTask(SVnodeTask* task);
|
||||
int vnodeQueryOpen(SVnode* pVnode);
|
||||
void vnodeQueryClose(SVnode* pVnode);
|
||||
|
||||
#define vFatal(...) \
|
||||
do { \
|
||||
|
|
|
@ -1,35 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_VNODE_READ_H_
|
||||
#define _TD_VNODE_READ_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
#include "qworker.h"
|
||||
#include "vnode.h"
|
||||
|
||||
|
||||
typedef struct SQWorkerMgmt SQHandle;
|
||||
|
||||
int vnodeQueryOpen(SVnode *pVnode);
|
||||
void vnodeQueryClose(SVnode *pVnode);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_VNODE_READ_H_*/
|
|
@ -15,9 +15,8 @@
|
|||
|
||||
#include "tcompare.h"
|
||||
#include "tdatablock.h"
|
||||
#include "tqInt.h"
|
||||
#include "tqMetaStore.h"
|
||||
#include "tstream.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
int32_t tqInit() { return tqPushMgrInit(); }
|
||||
|
||||
|
@ -272,7 +271,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
fetchOffset = pReq->currentOffset + 1;
|
||||
}
|
||||
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, pReq->currentOffset, fetchOffset);
|
||||
|
||||
SMqPollRsp rsp = {
|
||||
/*.consumerId = consumerId,*/
|
||||
|
@ -299,7 +299,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
int sz = taosArrayGetSize(pConsumer->topics);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
STqTopic* topic = taosArrayGet(pConsumer->topics, i);
|
||||
//TODO race condition
|
||||
// TODO race condition
|
||||
ASSERT(pConsumer->consumerId == consumerId);
|
||||
if (strcmp(topic->topicName, pReq->topic) == 0) {
|
||||
pTopic = topic;
|
||||
|
@ -307,7 +307,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
}
|
||||
if (pTopic == NULL) {
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, pTq->pVnode->vgId);
|
||||
vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic,
|
||||
pTq->pVnode->vgId);
|
||||
pMsg->pCont = NULL;
|
||||
pMsg->contLen = 0;
|
||||
pMsg->code = -1;
|
||||
|
@ -322,10 +323,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
|
||||
while (1) {
|
||||
/*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/
|
||||
//TODO
|
||||
// TODO
|
||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||
if (consumerEpoch > pReq->epoch) {
|
||||
//TODO: return
|
||||
// TODO: return
|
||||
break;
|
||||
}
|
||||
SWalReadHead* pHead;
|
||||
|
@ -333,10 +334,12 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
// TODO: no more log, set timer to wait blocking time
|
||||
// if data inserted during waiting, launch query and
|
||||
// response to user
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset);
|
||||
break;
|
||||
}
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||
pTq->pVnode->vgId, fetchOffset, pHead->msgType);
|
||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||
|
@ -360,7 +363,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
}
|
||||
|
||||
if (taosArrayGetSize(pRes) == 0) {
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
||||
pReq->epoch, pTq->pVnode->vgId, fetchOffset);
|
||||
fetchOffset++;
|
||||
rsp.skipLogNum++;
|
||||
taosArrayDestroy(pRes);
|
||||
|
@ -389,7 +393,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, pHead->msgType, consumerId, pReq->epoch);
|
||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset,
|
||||
pHead->msgType, consumerId, pReq->epoch);
|
||||
tmsgSendRsp(pMsg);
|
||||
taosMemoryFree(pHead);
|
||||
return 0;
|
||||
|
@ -420,7 +425,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
|||
pMsg->contLen = tlen;
|
||||
pMsg->code = 0;
|
||||
tmsgSendRsp(pMsg);
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, pReq->epoch);
|
||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId,
|
||||
pReq->epoch);
|
||||
/*}*/
|
||||
|
||||
return 0;
|
||||
|
@ -431,7 +437,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
|
|||
terrno = TSDB_CODE_SUCCESS;
|
||||
tDecodeSMqMVRebReq(msg, &req);
|
||||
|
||||
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId ,req.newConsumerId);
|
||||
vDebug("vg %d set from consumer %ld to consumer %ld", req.vgId, req.oldConsumerId, req.newConsumerId);
|
||||
STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, req.oldConsumerId);
|
||||
ASSERT(pConsumer);
|
||||
ASSERT(pConsumer->consumerId == req.oldConsumerId);
|
||||
|
|
|
@ -12,7 +12,7 @@
|
|||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
#include "tqMetaStore.h"
|
||||
#include "vnodeInt.h"
|
||||
// TODO:replace by an abstract file layer
|
||||
#include <fcntl.h>
|
||||
#include <string.h>
|
||||
|
|
|
@ -14,7 +14,7 @@
|
|||
*/
|
||||
#define _DEFAULT_SOURCE
|
||||
|
||||
#include "tqOffset.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
enum ETqOffsetPersist {
|
||||
TQ_OFFSET_PERSIST__LAZY = 1,
|
||||
|
@ -39,4 +39,3 @@ STqOffsetStore* STqOffsetOpen(STqOffsetCfg* pCfg) {
|
|||
pStore->pHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||
return pStore;
|
||||
}
|
||||
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tqPush.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
int32_t tqPushMgrInit() {
|
||||
//
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnodeQuery.h"
|
||||
#include "executor.h"
|
||||
#include "vnodeInt.h"
|
||||
|
||||
|
|
Loading…
Reference in New Issue