add memallocator dependency

This commit is contained in:
Liu Jicong 2021-11-16 16:03:39 +08:00
parent 6cb65f4385
commit fed05bb64b
4 changed files with 85 additions and 60 deletions

View File

@ -18,6 +18,7 @@
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
#include "mallocator.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -37,11 +38,11 @@ typedef struct TmqOneAck {
typedef struct TmqAcks { typedef struct TmqAcks {
int32_t ackNum; int32_t ackNum;
//should be sorted // should be sorted
TmqOneAck acks[]; TmqOneAck acks[];
} TmqAcks; } TmqAcks;
//TODO: put msgs into common // TODO: put msgs into common
typedef struct TmqConnectReq { typedef struct TmqConnectReq {
TmqMsgHead head; TmqMsgHead head;
TmqAcks acks; TmqAcks acks;
@ -87,47 +88,41 @@ typedef struct TmqSubscribeReq {
typedef struct tmqSubscribeRsp { typedef struct tmqSubscribeRsp {
TmqMsgHead head; TmqMsgHead head;
int64_t vgId; int64_t vgId;
char ep[TSDB_EP_LEN]; //TSDB_EP_LEN char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
} TmqSubscribeRsp; } TmqSubscribeRsp;
typedef struct TmqHeartbeatReq { typedef struct TmqHeartbeatReq {
} TmqHeartbeatReq; } TmqHeartbeatReq;
typedef struct TmqHeartbeatRsp { typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp; } TmqHeartbeatRsp;
typedef struct TqTopicVhandle { typedef struct TqTopicVhandle {
int64_t topicId; int64_t topicId;
//executor for filter // executor for filter
void* filterExec; void* filterExec;
//callback for mnode // callback for mnode
//trigger when vnode list associated topic change // trigger when vnode list associated topic change
void* (*mCallback)(void*, void*); void* (*mCallback)(void*, void*);
} TqTopicVhandle; } TqTopicVhandle;
typedef struct STQ {
//the collection of group handle
//the handle of kvstore
} STQ;
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
//TODO: define a serializer and deserializer // TODO: define a serializer and deserializer
typedef struct TqBufferItem { typedef struct TqBufferItem {
int64_t offset; int64_t offset;
//executors are identical but not concurrent // executors are identical but not concurrent
//so it must be a copy in each item // so it must be a copy in each item
void* executor; void* executor;
int64_t size; int64_t size;
void* content; void* content;
} TqBufferItem; } TqBufferItem;
typedef struct TqBufferHandle { typedef struct TqBufferHandle {
//char* topic; //c style, end with '\0' // char* topic; //c style, end with '\0'
//int64_t cgId; // int64_t cgId;
//void* ahandle; // void* ahandle;
int64_t nextConsumeOffset; int64_t nextConsumeOffset;
int64_t topicId; int64_t topicId;
int32_t head; int32_t head;
@ -145,7 +140,7 @@ typedef struct TqGroupHandle {
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
TqListHandle *head; TqListHandle* head;
} TqGroupHandle; } TqGroupHandle;
typedef struct TqQueryExec { typedef struct TqQueryExec {
@ -155,15 +150,36 @@ typedef struct TqQueryExec {
} TqQueryExec; } TqQueryExec;
typedef struct TqQueryMsg { typedef struct TqQueryMsg {
TqQueryExec *exec; TqQueryExec* exec;
struct TqQueryMsg *next; struct TqQueryMsg* next;
} TqQueryMsg; } TqQueryMsg;
//init in each vnode typedef struct TqLogReader {
STQ* tqInit(void* ref_func(void*), void* unref_func(void*)); void* logHandle;
void tqCleanUp(STQ*); int32_t (*walRead)(void* logHandle, void** data, int64_t ver);
int64_t (*walGetFirstVer)(void* logHandle);
int64_t (*walGetSnapshotVer)(void* logHandle);
int64_t (*walGetLastVer)(void* logHandle);
} TqLogReader;
//void* will be replace by a msg type typedef struct TqConfig {
// TODO
} TqConfig;
typedef struct STQ {
// the collection of group handle
// the handle of kvstore
const char* path;
TqConfig* tqConfig;
TqLogReader* tqLogReader;
SMemAllocatorFactory* allocFac;
} STQ;
// open in each vnode
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac);
void tqDestroy(STQ*);
// void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
@ -179,16 +195,16 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*); int tqLaunchQuery(TqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*); int tqSendLaunchQuery(TqGroupHandle*);
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes); int tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes);
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr); void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr); void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr); void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *ghandle); const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle); const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem); const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
int tqGetGHandleSSize(const TqGroupHandle *gHandle); int tqGetGHandleSSize(const TqGroupHandle* gHandle);
int tqBufHandleSSize(); int tqBufHandleSSize();
int tqBufItemSSize(); int tqBufItemSSize();

View File

@ -75,9 +75,9 @@ int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum); int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
// lifecycle check // lifecycle check
int32_t walFirstVer(SWal *); int64_t walGetFirstVer(SWal *);
int32_t walPersistedVer(SWal *); int64_t walGetSnapshotVer(SWal *);
int32_t walLastVer(SWal *); int64_t walGetLastVer(SWal *);
// int32_t walDataCorrupted(SWal*); // int32_t walDataCorrupted(SWal*);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -17,7 +17,6 @@
#define _TQ_META_STORE_H_ #define _TQ_META_STORE_H_
#include "os.h" #include "os.h"
#include "tq.h"
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -27,10 +27,20 @@ void walClose(SWal *pWal) {}
void walFsync(SWal *pWal, bool force) {} void walFsync(SWal *pWal, bool force) {}
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {} int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {
return 0;
}
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; } int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; } int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; } int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }
int32_t walRead(SWal *, SWalHead **, int64_t ver);
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
int64_t walGetFirstVer(SWal *);
int64_t walGetSnapshotVer(SWal *);
int64_t walGetLastVer(SWal *);