add walhandle for integration
This commit is contained in:
parent
fed05bb64b
commit
759b077ecf
|
@ -109,11 +109,10 @@ typedef struct TqTopicVhandle {
|
||||||
|
|
||||||
#define TQ_BUFFER_SIZE 8
|
#define TQ_BUFFER_SIZE 8
|
||||||
|
|
||||||
// TODO: define a serializer and deserializer
|
|
||||||
typedef struct TqBufferItem {
|
typedef struct TqBufferItem {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
// executors are identical but not concurrent
|
// executors are identical but not concurrent
|
||||||
// so it must be a copy in each item
|
// so there must be a copy in each item
|
||||||
void* executor;
|
void* executor;
|
||||||
int64_t size;
|
int64_t size;
|
||||||
void* content;
|
void* content;
|
||||||
|
@ -156,23 +155,111 @@ typedef struct TqQueryMsg {
|
||||||
|
|
||||||
typedef struct TqLogReader {
|
typedef struct TqLogReader {
|
||||||
void* logHandle;
|
void* logHandle;
|
||||||
int32_t (*walRead)(void* logHandle, void** data, int64_t ver);
|
int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
|
||||||
int64_t (*walGetFirstVer)(void* logHandle);
|
int64_t (*logGetFirstVer)(void* logHandle);
|
||||||
int64_t (*walGetSnapshotVer)(void* logHandle);
|
int64_t (*logGetSnapshotVer)(void* logHandle);
|
||||||
int64_t (*walGetLastVer)(void* logHandle);
|
int64_t (*logGetLastVer)(void* logHandle);
|
||||||
} TqLogReader;
|
} TqLogReader;
|
||||||
|
|
||||||
typedef struct TqConfig {
|
typedef struct TqConfig {
|
||||||
// TODO
|
// TODO
|
||||||
} TqConfig;
|
} TqConfig;
|
||||||
|
|
||||||
|
typedef struct TqMemRef {
|
||||||
|
SMemAllocatorFactory *pAlloctorFactory;
|
||||||
|
SMemAllocator *pAllocator;
|
||||||
|
} TqMemRef;
|
||||||
|
|
||||||
|
typedef struct TqSerializedHead {
|
||||||
|
int16_t ver;
|
||||||
|
int16_t action;
|
||||||
|
int32_t checksum;
|
||||||
|
int64_t ssize;
|
||||||
|
char content[];
|
||||||
|
} TqSerializedHead;
|
||||||
|
|
||||||
|
typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead);
|
||||||
|
typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj);
|
||||||
|
typedef void (*TqDeleteFun)(void*);
|
||||||
|
|
||||||
|
#define TQ_BUCKET_MASK 0xFF
|
||||||
|
#define TQ_BUCKET_SIZE 256
|
||||||
|
|
||||||
|
#define TQ_PAGE_SIZE 4096
|
||||||
|
//key + offset + size
|
||||||
|
#define TQ_IDX_SIZE 24
|
||||||
|
//4096 / 24
|
||||||
|
#define TQ_MAX_IDX_ONE_PAGE 170
|
||||||
|
//24 * 170
|
||||||
|
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
||||||
|
//4096 - 4080
|
||||||
|
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
||||||
|
|
||||||
|
#define TQ_ACTION_CONST 0
|
||||||
|
#define TQ_ACTION_INUSE 1
|
||||||
|
#define TQ_ACTION_INUSE_CONT 2
|
||||||
|
#define TQ_ACTION_INTXN 3
|
||||||
|
|
||||||
|
#define TQ_SVER 0
|
||||||
|
|
||||||
|
//TODO: inplace mode is not implemented
|
||||||
|
#define TQ_UPDATE_INPLACE 0
|
||||||
|
#define TQ_UPDATE_APPEND 1
|
||||||
|
|
||||||
|
#define TQ_DUP_INTXN_REWRITE 0
|
||||||
|
#define TQ_DUP_INTXN_REJECT 2
|
||||||
|
|
||||||
|
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
|
||||||
|
return tqConfigFlag & TQ_UPDATE_APPEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
|
||||||
|
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
|
||||||
|
}
|
||||||
|
|
||||||
|
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
||||||
|
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
||||||
|
|
||||||
|
typedef struct TqMetaHandle {
|
||||||
|
int64_t key;
|
||||||
|
int64_t offset;
|
||||||
|
int64_t serializedSize;
|
||||||
|
void* valueInUse;
|
||||||
|
void* valueInTxn;
|
||||||
|
} TqMetaHandle;
|
||||||
|
|
||||||
|
typedef struct TqMetaList {
|
||||||
|
TqMetaHandle handle;
|
||||||
|
struct TqMetaList* next;
|
||||||
|
//struct TqMetaList* inTxnPrev;
|
||||||
|
//struct TqMetaList* inTxnNext;
|
||||||
|
struct TqMetaList* unpersistPrev;
|
||||||
|
struct TqMetaList* unpersistNext;
|
||||||
|
} TqMetaList;
|
||||||
|
|
||||||
|
typedef struct TqMetaStore {
|
||||||
|
TqMetaList* bucket[TQ_BUCKET_SIZE];
|
||||||
|
//a table head
|
||||||
|
TqMetaList* unpersistHead;
|
||||||
|
//TODO:temporaral use, to be replaced by unified tfile
|
||||||
|
int fileFd;
|
||||||
|
//TODO:temporaral use, to be replaced by unified tfile
|
||||||
|
int idxFd;
|
||||||
|
char* dirPath;
|
||||||
|
int32_t tqConfigFlag;
|
||||||
|
TqSerializeFun pSerializer;
|
||||||
|
TqDeserializeFun pDeserializer;
|
||||||
|
TqDeleteFun pDeleter;
|
||||||
|
} TqMetaStore;
|
||||||
|
|
||||||
typedef struct STQ {
|
typedef struct STQ {
|
||||||
// the collection of group handle
|
// the collection of group handle
|
||||||
// the handle of kvstore
|
// the handle of kvstore
|
||||||
const char* path;
|
char* path;
|
||||||
TqConfig* tqConfig;
|
TqConfig* tqConfig;
|
||||||
TqLogReader* tqLogReader;
|
TqLogReader* tqLogReader;
|
||||||
SMemAllocatorFactory* allocFac;
|
TqMemRef tqMemRef;
|
||||||
|
TqMetaStore* tqMeta;
|
||||||
} STQ;
|
} STQ;
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
|
@ -187,7 +274,7 @@ int tqConsume(STQ*, TmqConsumeReq*);
|
||||||
|
|
||||||
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
|
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
|
||||||
|
|
||||||
int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||||
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
|
||||||
int tqMoveOffsetToNext(TqGroupHandle*);
|
int tqMoveOffsetToNext(TqGroupHandle*);
|
||||||
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
|
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
|
||||||
|
@ -195,18 +282,9 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle);
|
||||||
int tqLaunchQuery(TqGroupHandle*);
|
int tqLaunchQuery(TqGroupHandle*);
|
||||||
int tqSendLaunchQuery(TqGroupHandle*);
|
int tqSendLaunchQuery(TqGroupHandle*);
|
||||||
|
|
||||||
int tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes);
|
int tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead);
|
||||||
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
|
|
||||||
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
|
|
||||||
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
|
|
||||||
|
|
||||||
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle);
|
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle);
|
||||||
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
|
|
||||||
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
|
|
||||||
|
|
||||||
int tqGetGHandleSSize(const TqGroupHandle* gHandle);
|
|
||||||
int tqBufHandleSSize();
|
|
||||||
int tqBufItemSSize();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,8 +44,10 @@ typedef struct {
|
||||||
EWalType walLevel; // wal level
|
EWalType walLevel; // wal level
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
struct SWal;
|
typedef struct SWal {
|
||||||
typedef struct SWal SWal; // WAL HANDLE
|
int8_t unused;
|
||||||
|
} SWal; // WAL HANDLE
|
||||||
|
|
||||||
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
||||||
|
|
||||||
// module initialization
|
// module initialization
|
||||||
|
|
|
@ -17,97 +17,22 @@
|
||||||
#define _TQ_META_STORE_H_
|
#define _TQ_META_STORE_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define TQ_BUCKET_MASK 0xFF
|
|
||||||
#define TQ_BUCKET_SIZE 256
|
|
||||||
|
|
||||||
#define TQ_PAGE_SIZE 4096
|
|
||||||
//key + offset + size
|
|
||||||
#define TQ_IDX_SIZE 24
|
|
||||||
//4096 / 24
|
|
||||||
#define TQ_MAX_IDX_ONE_PAGE 170
|
|
||||||
//24 * 170
|
|
||||||
#define TQ_IDX_PAGE_BODY_SIZE 4080
|
|
||||||
//4096 - 4080
|
|
||||||
#define TQ_IDX_PAGE_HEAD_SIZE 16
|
|
||||||
|
|
||||||
#define TQ_ACTION_CONST 0
|
|
||||||
#define TQ_ACTION_INUSE 1
|
|
||||||
#define TQ_ACTION_INUSE_CONT 2
|
|
||||||
#define TQ_ACTION_INTXN 3
|
|
||||||
|
|
||||||
#define TQ_SVER 0
|
|
||||||
|
|
||||||
//TODO: inplace mode is not implemented
|
|
||||||
#define TQ_UPDATE_INPLACE 0
|
|
||||||
#define TQ_UPDATE_APPEND 1
|
|
||||||
|
|
||||||
#define TQ_DUP_INTXN_REWRITE 0
|
|
||||||
#define TQ_DUP_INTXN_REJECT 2
|
|
||||||
|
|
||||||
static inline bool TqUpdateAppend(int32_t tqConfigFlag) {
|
|
||||||
return tqConfigFlag & TQ_UPDATE_APPEND;
|
|
||||||
}
|
|
||||||
|
|
||||||
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) {
|
|
||||||
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
|
|
||||||
}
|
|
||||||
|
|
||||||
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
|
|
||||||
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
|
|
||||||
|
|
||||||
typedef struct TqSerializedHead {
|
|
||||||
int16_t ver;
|
|
||||||
int16_t action;
|
|
||||||
int32_t checksum;
|
|
||||||
int64_t ssize;
|
|
||||||
char content[];
|
|
||||||
} TqSerializedHead;
|
|
||||||
|
|
||||||
typedef struct TqMetaHandle {
|
|
||||||
int64_t key;
|
|
||||||
int64_t offset;
|
|
||||||
int64_t serializedSize;
|
|
||||||
void* valueInUse;
|
|
||||||
void* valueInTxn;
|
|
||||||
} TqMetaHandle;
|
|
||||||
|
|
||||||
typedef struct TqMetaList {
|
|
||||||
TqMetaHandle handle;
|
|
||||||
struct TqMetaList* next;
|
|
||||||
//struct TqMetaList* inTxnPrev;
|
|
||||||
//struct TqMetaList* inTxnNext;
|
|
||||||
struct TqMetaList* unpersistPrev;
|
|
||||||
struct TqMetaList* unpersistNext;
|
|
||||||
} TqMetaList;
|
|
||||||
|
|
||||||
typedef struct TqMetaStore {
|
|
||||||
TqMetaList* bucket[TQ_BUCKET_SIZE];
|
|
||||||
//a table head
|
|
||||||
TqMetaList* unpersistHead;
|
|
||||||
int fileFd; //TODO:temporaral use, to be replaced by unified tfile
|
|
||||||
int idxFd; //TODO:temporaral use, to be replaced by unified tfile
|
|
||||||
char* dirPath;
|
|
||||||
int32_t tqConfigFlag;
|
|
||||||
int (*serializer)(const void* pObj, TqSerializedHead** ppHead);
|
|
||||||
const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj);
|
|
||||||
void (*deleter)(void*);
|
|
||||||
} TqMetaStore;
|
|
||||||
|
|
||||||
TqMetaStore* tqStoreOpen(const char* path,
|
TqMetaStore* tqStoreOpen(const char* path,
|
||||||
int serializer(const void* pObj, TqSerializedHead** ppHead),
|
TqSerializeFun pSerializer,
|
||||||
const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
|
TqDeserializeFun pDeserializer,
|
||||||
void deleter(void* pObj),
|
TqDeleteFun pDeleter,
|
||||||
int32_t tqConfigFlag
|
int32_t tqConfigFlag
|
||||||
);
|
);
|
||||||
int32_t tqStoreClose(TqMetaStore*);
|
int32_t tqStoreClose(TqMetaStore*);
|
||||||
//int32_t tqStoreDelete(TqMetaStore*);
|
//int32_t tqStoreDelete(TqMetaStore*);
|
||||||
//int32_t TqStoreCommitAll(TqMetaStore*);
|
//int32_t tqStoreCommitAll(TqMetaStore*);
|
||||||
int32_t tqStorePersist(TqMetaStore*);
|
int32_t tqStorePersist(TqMetaStore*);
|
||||||
//clean deleted idx and data from persistent file
|
//clean deleted idx and data from persistent file
|
||||||
int32_t tqStoreCompact(TqMetaStore*);
|
int32_t tqStoreCompact(TqMetaStore*);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tqInt.h"
|
#include "tqInt.h"
|
||||||
|
#include "tqMetaStore.h"
|
||||||
|
|
||||||
//static
|
//static
|
||||||
//read next version data
|
//read next version data
|
||||||
|
@ -24,6 +25,46 @@
|
||||||
//
|
//
|
||||||
|
|
||||||
int tqGetgHandleSSize(const TqGroupHandle *gHandle);
|
int tqGetgHandleSSize(const TqGroupHandle *gHandle);
|
||||||
|
int tqBufHandleSSize();
|
||||||
|
int tqBufItemSSize();
|
||||||
|
|
||||||
|
TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
||||||
|
TqGroupHandle* gHandle;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr);
|
||||||
|
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr);
|
||||||
|
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr);
|
||||||
|
|
||||||
|
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle);
|
||||||
|
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem);
|
||||||
|
|
||||||
|
STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) {
|
||||||
|
STQ* pTq = malloc(sizeof(STQ));
|
||||||
|
if(pTq == NULL) {
|
||||||
|
//TODO: memory error
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
strcpy(pTq->path, path);
|
||||||
|
pTq->tqConfig = tqConfig;
|
||||||
|
pTq->tqLogReader = tqLogReader;
|
||||||
|
pTq->tqMemRef.pAlloctorFactory = allocFac;
|
||||||
|
pTq->tqMemRef.pAllocator = allocFac->create();
|
||||||
|
if(pTq->tqMemRef.pAllocator == NULL) {
|
||||||
|
//TODO
|
||||||
|
}
|
||||||
|
pTq->tqMeta = tqStoreOpen(path,
|
||||||
|
(TqSerializeFun)tqSerializeGroupHandle,
|
||||||
|
(TqDeserializeFun)tqDeserializeGroupHandle,
|
||||||
|
free,
|
||||||
|
0);
|
||||||
|
if(pTq->tqMeta == NULL) {
|
||||||
|
//TODO: free STQ
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pTq;
|
||||||
|
}
|
||||||
|
|
||||||
static int tqProtoCheck(TmqMsgHead *pMsg) {
|
static int tqProtoCheck(TmqMsgHead *pMsg) {
|
||||||
return pMsg->protoVer == 0;
|
return pMsg->protoVer == 0;
|
||||||
|
@ -83,14 +124,29 @@ static int tqCommitTCGroup(TqGroupHandle* handle) {
|
||||||
|
|
||||||
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
|
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) {
|
||||||
//create in disk
|
//create in disk
|
||||||
|
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
|
||||||
|
if(gHandle == NULL) {
|
||||||
|
//TODO
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memset(gHandle, 0, sizeof(TqGroupHandle));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
||||||
//look up in disk
|
TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
|
||||||
|
if(gHandle == NULL) {
|
||||||
|
int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
|
||||||
|
if(code != 0) {
|
||||||
|
//TODO
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//create
|
//create
|
||||||
//open
|
//open
|
||||||
return 0;
|
return gHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
|
||||||
|
@ -207,16 +263,20 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) {
|
int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) {
|
||||||
//calculate size
|
//calculate size
|
||||||
int sz = tqGetgHandleSSize(gHandle);
|
int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead);
|
||||||
void* ptr = realloc(*ppBytes, sz);
|
if(sz > (*ppHead)->ssize) {
|
||||||
if(ptr == NULL) {
|
void* tmpPtr = realloc(*ppHead, sz);
|
||||||
free(ppBytes);
|
if(tmpPtr == NULL) {
|
||||||
|
free(*ppHead);
|
||||||
//TODO: memory err
|
//TODO: memory err
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
*ppBytes = ptr;
|
*ppHead = tmpPtr;
|
||||||
|
(*ppHead)->ssize = sz;
|
||||||
|
}
|
||||||
|
void* ptr = (*ppHead)->content;
|
||||||
//do serialization
|
//do serialization
|
||||||
*(int64_t*)ptr = gHandle->cId;
|
*(int64_t*)ptr = gHandle->cId;
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
|
@ -261,8 +321,9 @@ void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) {
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) {
|
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) {
|
||||||
const void* ptr = pBytes;
|
TqGroupHandle *gHandle = *ppGHandle;
|
||||||
|
const void* ptr = pHead->content;
|
||||||
gHandle->cId = *(int64_t*)ptr;
|
gHandle->cId = *(int64_t*)ptr;
|
||||||
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
|
||||||
gHandle->cgId = *(int64_t*)ptr;
|
gHandle->cgId = *(int64_t*)ptr;
|
||||||
|
@ -317,15 +378,15 @@ const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) {
|
||||||
|
|
||||||
//TODO: make this a macro
|
//TODO: make this a macro
|
||||||
int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
|
int tqGetgHandleSSize(const TqGroupHandle *gHandle) {
|
||||||
return sizeof(int64_t) * 2
|
return sizeof(int64_t) * 2 //cId + cgId
|
||||||
+ sizeof(int32_t)
|
+ sizeof(int32_t) //topicNum
|
||||||
+ gHandle->topicNum * tqBufHandleSSize();
|
+ gHandle->topicNum * tqBufHandleSSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
//TODO: make this a macro
|
//TODO: make this a macro
|
||||||
int tqBufHandleSSize() {
|
int tqBufHandleSSize() {
|
||||||
return sizeof(int64_t) * 2
|
return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
|
||||||
+ sizeof(int32_t) * 2
|
+ sizeof(int32_t) * 2 // head + tail
|
||||||
+ TQ_BUFFER_SIZE * tqBufItemSSize();
|
+ TQ_BUFFER_SIZE * tqBufItemSSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TqMetaStore* tqStoreOpen(const char* path,
|
TqMetaStore* tqStoreOpen(const char* path,
|
||||||
int serializer(const void* pObj, TqSerializedHead** ppHead),
|
TqSerializeFun serializer,
|
||||||
const void* deserializer(const TqSerializedHead* pHead, void** ppObj),
|
TqDeserializeFun deserializer,
|
||||||
void deleter(void* pObj),
|
TqDeleteFun deleter,
|
||||||
int32_t tqConfigFlag
|
int32_t tqConfigFlag
|
||||||
) {
|
) {
|
||||||
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
|
TqMetaStore* pMeta = malloc(sizeof(TqMetaStore));
|
||||||
|
@ -127,9 +127,9 @@ TqMetaStore* tqStoreOpen(const char* path,
|
||||||
|
|
||||||
pMeta->fileFd = fileFd;
|
pMeta->fileFd = fileFd;
|
||||||
|
|
||||||
pMeta->serializer = serializer;
|
pMeta->pSerializer = serializer;
|
||||||
pMeta->deserializer = deserializer;
|
pMeta->pDeserializer = deserializer;
|
||||||
pMeta->deleter = deleter;
|
pMeta->pDeleter = deleter;
|
||||||
pMeta->tqConfigFlag = tqConfigFlag;
|
pMeta->tqConfigFlag = tqConfigFlag;
|
||||||
|
|
||||||
//read idx file and load into memory
|
//read idx file and load into memory
|
||||||
|
@ -171,25 +171,25 @@ TqMetaStore* tqStoreOpen(const char* path,
|
||||||
}
|
}
|
||||||
if(serializedObj->action == TQ_ACTION_INUSE) {
|
if(serializedObj->action == TQ_ACTION_INUSE) {
|
||||||
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
||||||
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
|
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
|
||||||
} else {
|
} else {
|
||||||
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
|
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
|
||||||
}
|
}
|
||||||
} else if(serializedObj->action == TQ_ACTION_INTXN) {
|
} else if(serializedObj->action == TQ_ACTION_INTXN) {
|
||||||
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
||||||
pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn);
|
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
|
||||||
} else {
|
} else {
|
||||||
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
||||||
}
|
}
|
||||||
} else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
|
} else if(serializedObj->action == TQ_ACTION_INUSE_CONT) {
|
||||||
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
if(serializedObj->ssize != sizeof(TqSerializedHead)) {
|
||||||
pMeta->deserializer(serializedObj, &pNode->handle.valueInUse);
|
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
|
||||||
} else {
|
} else {
|
||||||
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
|
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
|
||||||
}
|
}
|
||||||
TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
|
TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
|
||||||
if(ptr->ssize != sizeof(TqSerializedHead)) {
|
if(ptr->ssize != sizeof(TqSerializedHead)) {
|
||||||
pMeta->deserializer(ptr, &pNode->handle.valueInTxn);
|
pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
|
||||||
} else {
|
} else {
|
||||||
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
||||||
}
|
}
|
||||||
|
@ -225,11 +225,11 @@ TqMetaStore* tqStoreOpen(const char* path,
|
||||||
if(pBucketNode) {
|
if(pBucketNode) {
|
||||||
if(pBucketNode->handle.valueInUse
|
if(pBucketNode->handle.valueInUse
|
||||||
&& pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
&& pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pBucketNode->handle.valueInUse);
|
pMeta->pDeleter(pBucketNode->handle.valueInUse);
|
||||||
}
|
}
|
||||||
if(pBucketNode->handle.valueInTxn
|
if(pBucketNode->handle.valueInTxn
|
||||||
&& pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
&& pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pBucketNode->handle.valueInTxn);
|
pMeta->pDeleter(pBucketNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
free(pBucketNode);
|
free(pBucketNode);
|
||||||
}
|
}
|
||||||
|
@ -253,11 +253,11 @@ int32_t tqStoreClose(TqMetaStore* pMeta) {
|
||||||
ASSERT(pNode->unpersistPrev == NULL);
|
ASSERT(pNode->unpersistPrev == NULL);
|
||||||
if(pNode->handle.valueInTxn
|
if(pNode->handle.valueInTxn
|
||||||
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInTxn);
|
pMeta->pDeleter(pNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
if(pNode->handle.valueInUse
|
if(pNode->handle.valueInUse
|
||||||
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInUse);
|
pMeta->pDeleter(pNode->handle.valueInUse);
|
||||||
}
|
}
|
||||||
TqMetaList* next = pNode->next;
|
TqMetaList* next = pNode->next;
|
||||||
free(pNode);
|
free(pNode);
|
||||||
|
@ -280,11 +280,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
|
||||||
while(pNode) {
|
while(pNode) {
|
||||||
if(pNode->handle.valueInTxn
|
if(pNode->handle.valueInTxn
|
||||||
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInTxn);
|
pMeta->pDeleter(pNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
if(pNode->handle.valueInUse
|
if(pNode->handle.valueInUse
|
||||||
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInUse);
|
pMeta->pDeleter(pNode->handle.valueInUse);
|
||||||
}
|
}
|
||||||
TqMetaList* next = pNode->next;
|
TqMetaList* next = pNode->next;
|
||||||
free(pNode);
|
free(pNode);
|
||||||
|
@ -338,7 +338,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
|
||||||
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
|
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
|
||||||
pSHead->ssize = sizeof(TqSerializedHead);
|
pSHead->ssize = sizeof(TqSerializedHead);
|
||||||
} else {
|
} else {
|
||||||
pMeta->serializer(pNode->handle.valueInUse, &pSHead);
|
pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
|
||||||
}
|
}
|
||||||
nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
|
nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize);
|
||||||
ASSERT(nBytes == pSHead->ssize);
|
ASSERT(nBytes == pSHead->ssize);
|
||||||
|
@ -349,7 +349,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
|
||||||
if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
|
if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
|
||||||
pSHead->ssize = sizeof(TqSerializedHead);
|
pSHead->ssize = sizeof(TqSerializedHead);
|
||||||
} else {
|
} else {
|
||||||
pMeta->serializer(pNode->handle.valueInTxn, &pSHead);
|
pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
|
||||||
}
|
}
|
||||||
int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
|
int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize);
|
||||||
ASSERT(nBytesTxn == pSHead->ssize);
|
ASSERT(nBytesTxn == pSHead->ssize);
|
||||||
|
@ -423,7 +423,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
|
||||||
//TODO: think about thread safety
|
//TODO: think about thread safety
|
||||||
if(pNode->handle.valueInUse
|
if(pNode->handle.valueInUse
|
||||||
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInUse);
|
pMeta->pDeleter(pNode->handle.valueInUse);
|
||||||
}
|
}
|
||||||
//change pointer ownership
|
//change pointer ownership
|
||||||
pNode->handle.valueInUse = value;
|
pNode->handle.valueInUse = value;
|
||||||
|
@ -496,7 +496,7 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val
|
||||||
return -2;
|
return -2;
|
||||||
}
|
}
|
||||||
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInTxn);
|
pMeta->pDeleter(pNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pNode->handle.valueInTxn = value;
|
pNode->handle.valueInTxn = value;
|
||||||
|
@ -562,7 +562,7 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
|
||||||
}
|
}
|
||||||
if(pNode->handle.valueInUse
|
if(pNode->handle.valueInUse
|
||||||
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInUse);
|
pMeta->pDeleter(pNode->handle.valueInUse);
|
||||||
}
|
}
|
||||||
pNode->handle.valueInUse = pNode->handle.valueInTxn;
|
pNode->handle.valueInUse = pNode->handle.valueInTxn;
|
||||||
pNode->handle.valueInTxn = NULL;
|
pNode->handle.valueInTxn = NULL;
|
||||||
|
@ -582,7 +582,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
|
||||||
if(pNode->handle.key == key) {
|
if(pNode->handle.key == key) {
|
||||||
if(pNode->handle.valueInTxn) {
|
if(pNode->handle.valueInTxn) {
|
||||||
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
pMeta->deleter(pNode->handle.valueInTxn);
|
pMeta->pDeleter(pNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
pNode->handle.valueInTxn = NULL;
|
pNode->handle.valueInTxn = NULL;
|
||||||
tqLinkUnpersist(pMeta, pNode);
|
tqLinkUnpersist(pMeta, pNode);
|
||||||
|
@ -602,7 +602,7 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
|
||||||
while(pNode) {
|
while(pNode) {
|
||||||
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
|
||||||
if(pNode->handle.valueInTxn) {
|
if(pNode->handle.valueInTxn) {
|
||||||
pMeta->deleter(pNode->handle.valueInTxn);
|
pMeta->pDeleter(pNode->handle.valueInTxn);
|
||||||
}
|
}
|
||||||
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
|
||||||
tqLinkUnpersist(pMeta, pNode);
|
tqLinkUnpersist(pMeta, pNode);
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <cstring>
|
||||||
|
#include <iostream>
|
||||||
|
#include <queue>
|
||||||
|
|
||||||
|
#include "tq.h"
|
||||||
|
|
||||||
|
using namespace std;
|
||||||
|
|
||||||
|
TEST(TqSerializerTest, basicTest) {
|
||||||
|
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle));
|
||||||
|
|
||||||
|
}
|
|
@ -19,11 +19,19 @@ int32_t walInit() { return 0; }
|
||||||
|
|
||||||
void walCleanUp() {}
|
void walCleanUp() {}
|
||||||
|
|
||||||
SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; }
|
SWal *walOpen(char *path, SWalCfg *pCfg) {
|
||||||
|
SWal* pWal = malloc(sizeof(SWal));
|
||||||
|
if(pWal == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pWal;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
|
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
|
||||||
|
|
||||||
void walClose(SWal *pWal) {}
|
void walClose(SWal *pWal) {
|
||||||
|
if(pWal) free(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
void walFsync(SWal *pWal, bool force) {}
|
void walFsync(SWal *pWal, bool force) {}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue