Merge pull request #9130 from taosdata/feature/tq

Feature/tq
This commit is contained in:
Liu Jicong 2021-12-16 15:05:51 +08:00 committed by GitHub
commit ce321beb6c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 876 additions and 908 deletions

View File

@ -16,9 +16,9 @@
#ifndef _TD_TQ_H_ #ifndef _TD_TQ_H_
#define _TD_TQ_H_ #define _TD_TQ_H_
#include "mallocator.h"
#include "os.h" #include "os.h"
#include "tutil.h" #include "tutil.h"
#include "mallocator.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -97,90 +97,90 @@ typedef struct TmqHeartbeatReq {
typedef struct TmqHeartbeatRsp { typedef struct TmqHeartbeatRsp {
} TmqHeartbeatRsp; } TmqHeartbeatRsp;
typedef struct TqTopicVhandle { typedef struct STqTopicVhandle {
int64_t topicId; int64_t topicId;
// executor for filter // executor for filter
void* filterExec; void* filterExec;
// callback for mnode // callback for mnode
// trigger when vnode list associated topic change // trigger when vnode list associated topic change
void* (*mCallback)(void*, void*); void* (*mCallback)(void*, void*);
} TqTopicVhandle; } STqTopicVhandle;
#define TQ_BUFFER_SIZE 8 #define TQ_BUFFER_SIZE 8
typedef struct TqBufferItem { typedef struct STqBufferItem {
int64_t offset; int64_t offset;
// executors are identical but not concurrent // executors are identical but not concurrent
// so there must be a copy in each item // so there must be a copy in each item
void* executor; void* executor;
int64_t size; int64_t size;
void* content; void* content;
} TqBufferItem; } STqBufferItem;
typedef struct TqBufferHandle { typedef struct STqBufferHandle {
// char* topic; //c style, end with '\0' // char* topic; //c style, end with '\0'
// int64_t cgId; // int64_t cgId;
// void* ahandle; // void* ahandle;
int64_t nextConsumeOffset; int64_t nextConsumeOffset;
int64_t floatingCursor;
int64_t topicId; int64_t topicId;
int32_t head; int32_t head;
int32_t tail; int32_t tail;
TqBufferItem buffer[TQ_BUFFER_SIZE]; STqBufferItem buffer[TQ_BUFFER_SIZE];
} TqBufferHandle; } STqBufferHandle;
typedef struct TqListHandle { typedef struct STqListHandle {
TqBufferHandle bufHandle; STqBufferHandle bufHandle;
struct TqListHandle* next; struct STqListHandle* next;
} TqListHandle; } STqListHandle;
typedef struct TqGroupHandle { typedef struct STqGroupHandle {
int64_t cId; int64_t cId;
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
TqListHandle* head; STqListHandle* head;
} TqGroupHandle; } STqGroupHandle;
typedef struct TqQueryExec { typedef struct STqQueryExec {
void* src; void* src;
TqBufferItem* dest; STqBufferItem* dest;
void* executor; void* executor;
} TqQueryExec; } STqQueryExec;
typedef struct TqQueryMsg { typedef struct STqQueryMsg {
TqQueryExec* exec; STqQueryExec* exec;
struct TqQueryMsg* next; struct STqQueryMsg* next;
} TqQueryMsg; } STqQueryMsg;
typedef struct TqLogReader { typedef struct STqLogReader {
void* logHandle; void* logHandle;
int32_t (*logRead)(void* logHandle, void** data, int64_t ver); int32_t (*logRead)(void* logHandle, void** data, int64_t ver);
int64_t (*logGetFirstVer)(void* logHandle); int64_t (*logGetFirstVer)(void* logHandle);
int64_t (*logGetSnapshotVer)(void* logHandle); int64_t (*logGetSnapshotVer)(void* logHandle);
int64_t (*logGetLastVer)(void* logHandle); int64_t (*logGetLastVer)(void* logHandle);
} TqLogReader; } STqLogReader;
typedef struct STqCfg { typedef struct STqCfg {
// TODO // TODO
} STqCfg; } STqCfg;
typedef struct TqMemRef { typedef struct STqMemRef {
SMemAllocatorFactory* pAlloctorFactory; SMemAllocatorFactory* pAlloctorFactory;
SMemAllocator* pAllocator; SMemAllocator* pAllocator;
} TqMemRef; } STqMemRef;
typedef struct TqSerializedHead { typedef struct STqSerializedHead {
int16_t ver; int16_t ver;
int16_t action; int16_t action;
int32_t checksum; int32_t checksum;
int64_t ssize; int64_t ssize;
char content[]; char content[];
} TqSerializedHead; } STqSerializedHead;
typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead); typedef int (*FTqSerialize)(const void* pObj, STqSerializedHead** ppHead);
typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj); typedef const void* (*FTqDeserialize)(const STqSerializedHead* pHead, void** ppObj);
typedef void (*TqDeleteFun)(void*); typedef void (*FTqDelete)(void*);
#define TQ_BUCKET_MASK 0xFF #define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256 #define TQ_BUCKET_SIZE 256
@ -209,15 +209,12 @@ typedef void (*TqDeleteFun)(void*);
#define TQ_DUP_INTXN_REWRITE 0 #define TQ_DUP_INTXN_REWRITE 0
#define TQ_DUP_INTXN_REJECT 2 #define TQ_DUP_INTXN_REJECT 2
static inline bool TqUpdateAppend(int32_t tqConfigFlag) { static inline bool TqUpdateAppend(int32_t tqConfigFlag) { return tqConfigFlag & TQ_UPDATE_APPEND; }
return tqConfigFlag & TQ_UPDATE_APPEND;
}
static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { return tqConfigFlag & TQ_DUP_INTXN_REJECT; }
return tqConfigFlag & TQ_DUP_INTXN_REJECT;
}
static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST;
#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE
typedef struct TqMetaHandle { typedef struct TqMetaHandle {
@ -247,9 +244,9 @@ typedef struct TqMetaStore {
int idxFd; int idxFd;
char* dirPath; char* dirPath;
int32_t tqConfigFlag; int32_t tqConfigFlag;
TqSerializeFun pSerializer; FTqSerialize pSerializer;
TqDeserializeFun pDeserializer; FTqDeserialize pDeserializer;
TqDeleteFun pDeleter; FTqDelete pDeleter;
} STqMetaStore; } STqMetaStore;
typedef struct STQ { typedef struct STQ {
@ -257,13 +254,13 @@ typedef struct STQ {
// the handle of kvstore // the handle of kvstore
char* path; char* path;
STqCfg* tqConfig; STqCfg* tqConfig;
TqLogReader* tqLogReader; STqLogReader* tqLogReader;
TqMemRef tqMemRef; STqMemRef tqMemRef;
STqMetaStore* tqMeta; STqMetaStore* tqMeta;
} STQ; } STQ;
// open in each vnode // open in each vnode
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac); STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac);
void tqDestroy(STQ*); void tqDestroy(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
@ -272,19 +269,19 @@ int tqCommit(STQ*);
int tqConsume(STQ*, TmqConsumeReq*); int tqConsume(STQ*, TmqConsumeReq*);
TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); STqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId);
TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); STqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqMoveOffsetToNext(TqGroupHandle*); int tqMoveOffsetToNext(STqGroupHandle*);
int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset);
int tqRegisterContext(TqGroupHandle*, void* ahandle); int tqRegisterContext(STqGroupHandle*, void* ahandle);
int tqLaunchQuery(TqGroupHandle*); int tqLaunchQuery(STqGroupHandle*);
int tqSendLaunchQuery(TqGroupHandle*); int tqSendLaunchQuery(STqGroupHandle*);
int tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead); int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead);
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle); const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** gHandle);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -16,21 +16,51 @@
#define _TD_WAL_H_ #define _TD_WAL_H_
#include "os.h" #include "os.h"
#include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tarray.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
extern int32_t wDebugFlag; extern int32_t wDebugFlag;
#define wFatal(...) { if (wDebugFlag & DEBUG_FATAL) { taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); }} #define wFatal(...) \
#define wError(...) { if (wDebugFlag & DEBUG_ERROR) { taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); }} { \
#define wWarn(...) { if (wDebugFlag & DEBUG_WARN) { taosPrintLog("WAL WARN ", 255, __VA_ARGS__); }} if (wDebugFlag & DEBUG_FATAL) { \
#define wInfo(...) { if (wDebugFlag & DEBUG_INFO) { taosPrintLog("WAL ", 255, __VA_ARGS__); }} taosPrintLog("WAL FATAL ", 255, __VA_ARGS__); \
#define wDebug(...) { if (wDebugFlag & DEBUG_DEBUG) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} } \
#define wTrace(...) { if (wDebugFlag & DEBUG_TRACE) { taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); }} }
#define wError(...) \
{ \
if (wDebugFlag & DEBUG_ERROR) { \
taosPrintLog("WAL ERROR ", 255, __VA_ARGS__); \
} \
}
#define wWarn(...) \
{ \
if (wDebugFlag & DEBUG_WARN) { \
taosPrintLog("WAL WARN ", 255, __VA_ARGS__); \
} \
}
#define wInfo(...) \
{ \
if (wDebugFlag & DEBUG_INFO) { \
taosPrintLog("WAL ", 255, __VA_ARGS__); \
} \
}
#define wDebug(...) \
{ \
if (wDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); \
} \
}
#define wTrace(...) \
{ \
if (wDebugFlag & DEBUG_TRACE) { \
taosPrintLog("WAL ", wDebugFlag, __VA_ARGS__); \
} \
}
#define WAL_HEAD_VER 0 #define WAL_HEAD_VER 0
#define WAL_NOSUFFIX_LEN 20 #define WAL_NOSUFFIX_LEN 20
@ -45,11 +75,7 @@ extern int32_t wDebugFlag;
#define WAL_CUR_FAILED 1 #define WAL_CUR_FAILED 1
#pragma pack(push, 1) #pragma pack(push, 1)
typedef enum { typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWalType;
TAOS_WAL_NOLOG = 0,
TAOS_WAL_WRITE = 1,
TAOS_WAL_FSYNC = 2
} EWalType;
typedef struct SWalReadHead { typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;

View File

@ -25,9 +25,9 @@ extern "C" {
STqMetaStore* tqStoreOpen(const char* path, STqMetaStore* tqStoreOpen(const char* path,
TqSerializeFun pSerializer, FTqSerialize pSerializer,
TqDeserializeFun pDeserializer, FTqDeserialize pDeserializer,
TqDeleteFun pDeleter, FTqDelete pDeleter,
int32_t tqConfigFlag int32_t tqConfigFlag
); );
int32_t tqStoreClose(STqMetaStore*); int32_t tqStoreClose(STqMetaStore*);

View File

@ -24,23 +24,23 @@
// handle management message // handle management message
// //
int tqGetgHandleSSize(const TqGroupHandle *gHandle); int tqGetgHandleSSize(const STqGroupHandle* gHandle);
int tqBufHandleSSize(); int tqBufHandleSSize();
int tqBufItemSSize(); int tqBufItemSSize();
TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { STqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
TqGroupHandle* gHandle; STqGroupHandle* gHandle;
return NULL; return NULL;
} }
void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr); void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr);
void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr); void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr);
void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr); void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr);
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle); const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle);
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem); const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem);
STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) { STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemAllocatorFactory* allocFac) {
STQ* pTq = malloc(sizeof(STQ)); STQ* pTq = malloc(sizeof(STQ));
if (pTq == NULL) { if (pTq == NULL) {
// TODO: memory error // TODO: memory error
@ -54,11 +54,8 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl
if (pTq->tqMemRef.pAllocator == NULL) { if (pTq->tqMemRef.pAllocator == NULL) {
// TODO // TODO
} }
pTq->tqMeta = tqStoreOpen(path, pTq->tqMeta =
(TqSerializeFun)tqSerializeGroupHandle, tqStoreOpen(path, (FTqSerialize)tqSerializeGroupHandle, (FTqDeserialize)tqDeserializeGroupHandle, free, 0);
(TqDeserializeFun)tqDeserializeGroupHandle,
free,
0);
if (pTq->tqMeta == NULL) { if (pTq->tqMeta == NULL) {
// TODO: free STQ // TODO: free STQ
return NULL; return NULL;
@ -66,18 +63,16 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, TqLogReader* tqLogReader, SMemAl
return pTq; return pTq;
} }
static int tqProtoCheck(TmqMsgHead *pMsg) { static int tqProtoCheck(TmqMsgHead* pMsg) { return pMsg->protoVer == 0; }
return pMsg->protoVer == 0;
}
static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg** ppQuery) { static int tqAckOneTopic(STqBufferHandle* bHandle, TmqOneAck* pAck, STqQueryMsg** ppQuery) {
// clean old item and move forward // clean old item and move forward
int32_t consumeOffset = pAck->consumeOffset; int32_t consumeOffset = pAck->consumeOffset;
int idx = consumeOffset % TQ_BUFFER_SIZE; int idx = consumeOffset % TQ_BUFFER_SIZE;
ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor); ASSERT(bHandle->buffer[idx].content && bHandle->buffer[idx].executor);
tfree(bHandle->buffer[idx].content); tfree(bHandle->buffer[idx].content);
if (1 /* TODO: need to launch new query */) { if (1 /* TODO: need to launch new query */) {
TqQueryMsg* pNewQuery = malloc(sizeof(TqQueryMsg)); STqQueryMsg* pNewQuery = malloc(sizeof(STqQueryMsg));
if (pNewQuery == NULL) { if (pNewQuery == NULL) {
// TODO: memory insufficient // TODO: memory insufficient
return -1; return -1;
@ -93,14 +88,14 @@ static int tqAckOneTopic(TqBufferHandle *bHandle, TmqOneAck *pAck, TqQueryMsg**
return 0; return 0;
} }
static int tqAck(TqGroupHandle* gHandle, TmqAcks* pAcks) { static int tqAck(STqGroupHandle* gHandle, TmqAcks* pAcks) {
int32_t ackNum = pAcks->ackNum; int32_t ackNum = pAcks->ackNum;
TmqOneAck* acks = pAcks->acks; TmqOneAck* acks = pAcks->acks;
// double ptr for acks and list // double ptr for acks and list
int i = 0; int i = 0;
TqListHandle* node = gHandle->head; STqListHandle* node = gHandle->head;
int ackCnt = 0; int ackCnt = 0;
TqQueryMsg *pQuery = NULL; STqQueryMsg* pQuery = NULL;
while (i < ackNum && node->next) { while (i < ackNum && node->next) {
if (acks[i].topicId == node->next->bufHandle.topicId) { if (acks[i].topicId == node->next->bufHandle.topicId) {
ackCnt++; ackCnt++;
@ -117,25 +112,25 @@ static int tqAck(TqGroupHandle* gHandle, TmqAcks* pAcks) {
return ackCnt; return ackCnt;
} }
static int tqCommitTCGroup(TqGroupHandle* handle) { static int tqCommitTCGroup(STqGroupHandle* handle) {
// persist modification into disk // persist modification into disk
return 0; return 0;
} }
int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) { int tqCreateTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroupHandle** handle) {
// create in disk // create in disk
TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle)); STqGroupHandle* gHandle = (STqGroupHandle*)malloc(sizeof(STqGroupHandle));
if (gHandle == NULL) { if (gHandle == NULL) {
// TODO // TODO
return -1; return -1;
} }
memset(gHandle, 0, sizeof(TqGroupHandle)); memset(gHandle, 0, sizeof(STqGroupHandle));
return 0; return 0;
} }
TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { STqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId); STqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId);
if (gHandle == NULL) { if (gHandle == NULL) {
int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle); int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle);
if (code != 0) { if (code != 0) {
@ -149,18 +144,16 @@ TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cI
return gHandle; return gHandle;
} }
int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { return 0; }
return 0;
}
int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { int tqDropTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) {
// delete from disk // delete from disk
return 0; return 0;
} }
static int tqFetch(TqGroupHandle* gHandle, void** msg) { static int tqFetch(STqGroupHandle* gHandle, void** msg) {
TqListHandle* head = gHandle->head; STqListHandle* head = gHandle->head;
TqListHandle* node = head; STqListHandle* node = head;
int totSize = 0; int totSize = 0;
// TODO: make it a macro // TODO: make it a macro
int sizeLimit = 4 * 1024; int sizeLimit = 4 * 1024;
@ -173,11 +166,9 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
// until all topic iterated or msgs over sizeLimit // until all topic iterated or msgs over sizeLimit
while (node->next) { while (node->next) {
node = node->next; node = node->next;
TqBufferHandle* bufHandle = &node->bufHandle; STqBufferHandle* bufHandle = &node->bufHandle;
int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE; int idx = bufHandle->nextConsumeOffset % TQ_BUFFER_SIZE;
if(bufHandle->buffer[idx].content != NULL && if (bufHandle->buffer[idx].content != NULL && bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset) {
bufHandle->buffer[idx].offset == bufHandle->nextConsumeOffset
) {
totSize += bufHandle->buffer[idx].size; totSize += bufHandle->buffer[idx].size;
if (totSize > sizeLimit) { if (totSize > sizeLimit) {
void* ptr = realloc(buffer, totSize); void* ptr = realloc(buffer, totSize);
@ -202,17 +193,11 @@ static int tqFetch(TqGroupHandle* gHandle, void** msg) {
return totSize; return totSize;
} }
TqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { STqGroupHandle* tqGetGroupHandle(STQ* pTq, int64_t cId) { return NULL; }
return NULL;
}
int tqLaunchQuery(TqGroupHandle* gHandle) { int tqLaunchQuery(STqGroupHandle* gHandle) { return 0; }
return 0;
}
int tqSendLaunchQuery(TqGroupHandle* gHandle) { int tqSendLaunchQuery(STqGroupHandle* gHandle) { return 0; }
return 0;
}
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/ /*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
/*return 0;*/ /*return 0;*/
@ -235,7 +220,7 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
return -1; return -1;
} }
int64_t clientId = pMsg->head.clientId; int64_t clientId = pMsg->head.clientId;
TqGroupHandle *gHandle = tqGetGroupHandle(pTq, clientId); STqGroupHandle* gHandle = tqGetGroupHandle(pTq, clientId);
if (gHandle == NULL) { if (gHandle == NULL) {
// client not connect // client not connect
return -1; return -1;
@ -262,9 +247,9 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) {
return 0; return 0;
} }
int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) { int tqSerializeGroupHandle(const STqGroupHandle* gHandle, STqSerializedHead** ppHead) {
// calculate size // calculate size
int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead); int sz = tqGetgHandleSSize(gHandle) + sizeof(STqSerializedHead);
if (sz > (*ppHead)->ssize) { if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz); void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) { if (tmpPtr == NULL) {
@ -289,8 +274,8 @@ int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHe
return 0; return 0;
} }
void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) { void* tqSerializeListHandle(STqListHandle* listHandle, void* ptr) {
TqListHandle *node = listHandle; STqListHandle* node = listHandle;
ASSERT(node != NULL); ASSERT(node != NULL);
while (node) { while (node) {
ptr = tqSerializeBufHandle(&node->bufHandle, ptr); ptr = tqSerializeBufHandle(&node->bufHandle, ptr);
@ -299,7 +284,7 @@ void* tqSerializeListHandle(TqListHandle *listHandle, void* ptr) {
return ptr; return ptr;
} }
void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) { void* tqSerializeBufHandle(STqBufferHandle* bufHandle, void* ptr) {
*(int64_t*)ptr = bufHandle->nextConsumeOffset; *(int64_t*)ptr = bufHandle->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = bufHandle->topicId; *(int64_t*)ptr = bufHandle->topicId;
@ -314,14 +299,14 @@ void* tqSerializeBufHandle(TqBufferHandle *bufHandle, void* ptr) {
return ptr; return ptr;
} }
void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) { void* tqSerializeBufItem(STqBufferItem* bufItem, void* ptr) {
// TODO: do we need serialize this? // TODO: do we need serialize this?
// mainly for executor // mainly for executor
return ptr; return ptr;
} }
const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) { const void* tqDeserializeGroupHandle(const STqSerializedHead* pHead, STqGroupHandle** ppGHandle) {
TqGroupHandle *gHandle = *ppGHandle; STqGroupHandle* gHandle = *ppGHandle;
const void* ptr = pHead->content; const void* ptr = pHead->content;
gHandle->cId = *(int64_t*)ptr; gHandle->cId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
@ -331,10 +316,10 @@ const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandl
gHandle->topicNum = *(int32_t*)ptr; gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t)); ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL; gHandle->head = NULL;
TqListHandle *node = gHandle->head; STqListHandle* node = gHandle->head;
for (int i = 0; i < gHandle->topicNum; i++) { for (int i = 0; i < gHandle->topicNum; i++) {
if (gHandle->head == NULL) { if (gHandle->head == NULL) {
if((node = malloc(sizeof(TqListHandle))) == NULL) { if ((node = malloc(sizeof(STqListHandle))) == NULL) {
// TODO: error // TODO: error
return NULL; return NULL;
} }
@ -342,7 +327,7 @@ const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandl
ptr = tqDeserializeBufHandle(ptr, &node->bufHandle); ptr = tqDeserializeBufHandle(ptr, &node->bufHandle);
gHandle->head = node; gHandle->head = node;
} else { } else {
node->next = malloc(sizeof(TqListHandle)); node->next = malloc(sizeof(STqListHandle));
if (node->next == NULL) { if (node->next == NULL) {
// TODO: error // TODO: error
return NULL; return NULL;
@ -355,7 +340,7 @@ const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandl
return ptr; return ptr;
} }
const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle) { const void* tqDeserializeBufHandle(const void* pBytes, STqBufferHandle* bufHandle) {
const void* ptr = pBytes; const void* ptr = pBytes;
bufHandle->nextConsumeOffset = *(int64_t*)ptr; bufHandle->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
@ -371,12 +356,10 @@ const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle *bufHandle
return ptr; return ptr;
} }
const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) { const void* tqDeserializeBufItem(const void* pBytes, STqBufferItem* bufItem) { return pBytes; }
return pBytes;
}
// TODO: make this a macro // TODO: make this a macro
int tqGetgHandleSSize(const TqGroupHandle *gHandle) { int tqGetgHandleSSize(const STqGroupHandle* gHandle) {
return sizeof(int64_t) * 2 // cId + cgId return sizeof(int64_t) * 2 // cId + cgId
+ sizeof(int32_t) // topicNum + sizeof(int32_t) // topicNum
+ gHandle->topicNum * tqBufHandleSSize(); + gHandle->topicNum * tqBufHandleSSize();

View File

@ -14,10 +14,10 @@
*/ */
#include "tqMetaStore.h" #include "tqMetaStore.h"
// TODO:replace by an abstract file layer // TODO:replace by an abstract file layer
#include "osDir.h"
#include <fcntl.h> #include <fcntl.h>
#include <string.h> #include <string.h>
#include <unistd.h> #include <unistd.h>
#include "osDir.h"
#define TQ_META_NAME "tq.meta" #define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx" #define TQ_IDX_NAME "tq.idx"
@ -42,17 +42,17 @@ static inline int tqSeekLastPage(int fd) {
} }
// TODO: the struct is tightly coupled with index entry // TODO: the struct is tightly coupled with index entry
typedef struct TqIdxPageHead { typedef struct STqIdxPageHead {
int16_t writeOffset; int16_t writeOffset;
int8_t unused[14]; int8_t unused[14];
} TqIdxPageHead; } STqIdxPageHead;
typedef struct TqIdxPageBuf { typedef struct STqIdxPageBuf {
TqIdxPageHead head; STqIdxPageHead head;
char buffer[TQ_IDX_PAGE_BODY_SIZE]; char buffer[TQ_IDX_PAGE_BODY_SIZE];
} TqIdxPageBuf; } STqIdxPageBuf;
static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { static inline int tqReadLastPage(int fd, STqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd); int offset = tqSeekLastPage(fd);
int nBytes; int nBytes;
if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { if ((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
@ -67,12 +67,8 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
return lseek(fd, offset, SEEK_SET); return lseek(fd, offset, SEEK_SET);
} }
STqMetaStore* tqStoreOpen(const char* path, STqMetaStore* tqStoreOpen(const char* path, FTqSerialize serializer, FTqDeserialize deserializer, FTqDelete deleter,
TqSerializeFun serializer, int32_t tqConfigFlag) {
TqDeserializeFun deserializer,
TqDeleteFun deleter,
int32_t tqConfigFlag
) {
STqMetaStore* pMeta = malloc(sizeof(STqMetaStore)); STqMetaStore* pMeta = malloc(sizeof(STqMetaStore));
if (pMeta == NULL) { if (pMeta == NULL) {
// close // close
@ -112,9 +108,7 @@ STqMetaStore* tqStoreOpen(const char* path,
return NULL; return NULL;
} }
memset(pMeta->unpersistHead, 0, sizeof(STqMetaList)); memset(pMeta->unpersistHead, 0, sizeof(STqMetaList));
pMeta->unpersistHead->unpersistNext pMeta->unpersistHead->unpersistNext = pMeta->unpersistHead->unpersistPrev = pMeta->unpersistHead;
= pMeta->unpersistHead->unpersistPrev
= pMeta->unpersistHead;
strcpy(name, path); strcpy(name, path);
strcat(name, "/" TQ_META_NAME); strcat(name, "/" TQ_META_NAME);
@ -132,8 +126,8 @@ STqMetaStore* tqStoreOpen(const char* path,
pMeta->tqConfigFlag = tqConfigFlag; pMeta->tqConfigFlag = tqConfigFlag;
// read idx file and load into memory // read idx file and load into memory
TqIdxPageBuf idxBuf; STqIdxPageBuf idxBuf;
TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); STqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if (serializedObj == NULL) { if (serializedObj == NULL) {
// TODO:memory insufficient // TODO:memory insufficient
} }
@ -169,25 +163,25 @@ STqMetaStore* tqStoreOpen(const char* path,
// TODO: read error // TODO: read error
} }
if (serializedObj->action == TQ_ACTION_INUSE) { if (serializedObj->action == TQ_ACTION_INUSE) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else { } else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN; pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
} else if (serializedObj->action == TQ_ACTION_INTXN) { } else if (serializedObj->action == TQ_ACTION_INTXN) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn);
} else { } else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
} }
} else if (serializedObj->action == TQ_ACTION_INUSE_CONT) { } else if (serializedObj->action == TQ_ACTION_INUSE_CONT) {
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if (serializedObj->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse);
} else { } else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN; pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); STqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if(ptr->ssize != sizeof(TqSerializedHead)) { if (ptr->ssize != sizeof(STqSerializedHead)) {
pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn); pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn);
} else { } else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
@ -205,8 +199,7 @@ STqMetaStore* tqStoreOpen(const char* path,
pNode->next = pBucketNode->next; pNode->next = pBucketNode->next;
pMeta->bucket[bucketKey] = pNode; pMeta->bucket[bucketKey] = pNode;
} else { } else {
while(pBucketNode->next && while (pBucketNode->next && pBucketNode->next->handle.key != pNode->handle.key) {
pBucketNode->next->handle.key != pNode->handle.key) {
pBucketNode = pBucketNode->next; pBucketNode = pBucketNode->next;
} }
if (pBucketNode->next) { if (pBucketNode->next) {
@ -222,12 +215,10 @@ STqMetaStore* tqStoreOpen(const char* path,
} }
} }
if (pBucketNode) { if (pBucketNode) {
if(pBucketNode->handle.valueInUse if (pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pBucketNode->handle.valueInUse); pMeta->pDeleter(pBucketNode->handle.valueInUse);
} }
if(pBucketNode->handle.valueInTxn if (pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
&& pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pBucketNode->handle.valueInTxn); pMeta->pDeleter(pBucketNode->handle.valueInTxn);
} }
free(pBucketNode); free(pBucketNode);
@ -250,12 +241,10 @@ int32_t tqStoreClose(STqMetaStore* pMeta) {
while (pNode) { while (pNode) {
ASSERT(pNode->unpersistNext == NULL); ASSERT(pNode->unpersistNext == NULL);
ASSERT(pNode->unpersistPrev == NULL); ASSERT(pNode->unpersistPrev == NULL);
if(pNode->handle.valueInTxn if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
STqMetaList* next = pNode->next; STqMetaList* next = pNode->next;
@ -277,12 +266,10 @@ int32_t tqStoreDelete(STqMetaStore* pMeta) {
STqMetaList* pNode = pMeta->bucket[i]; STqMetaList* pNode = pMeta->bucket[i];
pMeta->bucket[i] = NULL; pMeta->bucket[i] = NULL;
while (pNode) { while (pNode) {
if(pNode->handle.valueInTxn if (pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
if(pNode->handle.valueInUse if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
STqMetaList* next = pNode->next; STqMetaList* next = pNode->next;
@ -299,19 +286,19 @@ int32_t tqStoreDelete(STqMetaStore* pMeta) {
// TODO: wrap in tfile // TODO: wrap in tfile
int32_t tqStorePersist(STqMetaStore* pMeta) { int32_t tqStorePersist(STqMetaStore* pMeta) {
TqIdxPageBuf idxBuf; STqIdxPageBuf idxBuf;
int64_t* bufPtr = (int64_t*)idxBuf.buffer; int64_t* bufPtr = (int64_t*)idxBuf.buffer;
STqMetaList* pHead = pMeta->unpersistHead; STqMetaList* pHead = pMeta->unpersistHead;
STqMetaList* pNode = pHead->unpersistNext; STqMetaList* pNode = pHead->unpersistNext;
TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); STqSerializedHead* pSHead = malloc(sizeof(STqSerializedHead));
if (pSHead == NULL) { if (pSHead == NULL) {
// TODO: memory error // TODO: memory error
return -1; return -1;
} }
pSHead->ver = TQ_SVER; pSHead->ver = TQ_SVER;
pSHead->checksum = 0; pSHead->checksum = 0;
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(STqSerializedHead);
int allocatedSize = sizeof(TqSerializedHead); int allocatedSize = sizeof(STqSerializedHead);
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR); int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf); tqReadLastPage(pMeta->idxFd, &idxBuf);
@ -335,7 +322,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
} }
if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) { if (pNode->handle.valueInUse == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(STqSerializedHead);
} else { } else {
pMeta->pSerializer(pNode->handle.valueInUse, &pSHead); pMeta->pSerializer(pNode->handle.valueInUse, &pSHead);
} }
@ -346,7 +333,7 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
if (pNode->handle.valueInTxn) { if (pNode->handle.valueInTxn) {
pSHead->action = TQ_ACTION_INTXN; pSHead->action = TQ_ACTION_INTXN;
if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) { if (pNode->handle.valueInTxn == TQ_DELETE_TOKEN) {
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(STqSerializedHead);
} else { } else {
pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead); pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead);
} }
@ -379,17 +366,14 @@ int32_t tqStorePersist(STqMetaStore* pMeta) {
pNode = pHead->unpersistNext; pNode = pHead->unpersistNext;
// remove from bucket // remove from bucket
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN && if (pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL) {
pNode->handle.valueInTxn == NULL
) {
int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
STqMetaList* pBucketHead = pMeta->bucket[bucketKey]; STqMetaList* pBucketHead = pMeta->bucket[bucketKey];
if (pBucketHead == pNode) { if (pBucketHead == pNode) {
pMeta->bucket[bucketKey] = pNode->next; pMeta->bucket[bucketKey] = pNode->next;
} else { } else {
STqMetaList* pBucketNode = pBucketHead; STqMetaList* pBucketNode = pBucketHead;
while(pBucketNode->next != NULL while (pBucketNode->next != NULL && pBucketNode->next != pNode) {
&& pBucketNode->next != pNode) {
pBucketNode = pBucketNode->next; pBucketNode = pBucketNode->next;
} }
// impossible for pBucket->next == NULL // impossible for pBucket->next == NULL
@ -420,8 +404,7 @@ static int32_t tqHandlePutCommitted(STqMetaStore* pMeta, int64_t key, void* valu
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
// TODO: think about thread safety // TODO: think about thread safety
if(pNode->handle.valueInUse if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
// change pointer ownership // change pointer ownership
@ -452,8 +435,7 @@ void* tqHandleGet(STqMetaStore* pMeta, int64_t key) {
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
return pNode->handle.valueInUse; return pNode->handle.valueInUse;
} else { } else {
return NULL; return NULL;
@ -470,8 +452,7 @@ void* tqHandleTouchGet(STqMetaStore* pMeta, int64_t key) {
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL if (pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return pNode->handle.valueInUse; return pNode->handle.valueInUse;
} else { } else {
@ -519,9 +500,7 @@ static inline int32_t tqHandlePutImpl(STqMetaStore* pMeta, int64_t key, void* va
return 0; return 0;
} }
int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { int32_t tqHandleMovePut(STqMetaStore* pMeta, int64_t key, void* value) { return tqHandlePutImpl(pMeta, key, value); }
return tqHandlePutImpl(pMeta, key, value);
}
int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { int32_t tqHandleCopyPut(STqMetaStore* pMeta, int64_t key, void* value, size_t vsize) {
void* vmem = malloc(vsize); void* vmem = malloc(vsize);
@ -538,8 +517,7 @@ static void* tqHandleGetUncommitted(STqMetaStore* pMeta, int64_t key) {
STqMetaList* pNode = pMeta->bucket[bucketKey]; STqMetaList* pNode = pMeta->bucket[bucketKey];
while (pNode) { while (pNode) {
if (pNode->handle.key == key) { if (pNode->handle.key == key) {
if(pNode->handle.valueInTxn != NULL if (pNode->handle.valueInTxn != NULL && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
return pNode->handle.valueInTxn; return pNode->handle.valueInTxn;
} else { } else {
return NULL; return NULL;
@ -559,8 +537,7 @@ int32_t tqHandleCommit(STqMetaStore* pMeta, int64_t key) {
if (pNode->handle.valueInTxn == NULL) { if (pNode->handle.valueInTxn == NULL) {
return -1; return -1;
} }
if(pNode->handle.valueInUse if (pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->pDeleter(pNode->handle.valueInUse); pMeta->pDeleter(pNode->handle.valueInUse);
} }
pNode->handle.valueInUse = pNode->handle.valueInTxn; pNode->handle.valueInUse = pNode->handle.valueInTxn;
@ -603,6 +580,7 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
if (pNode->handle.valueInTxn) { if (pNode->handle.valueInTxn) {
pMeta->pDeleter(pNode->handle.valueInTxn); pMeta->pDeleter(pNode->handle.valueInTxn);
} }
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
@ -615,6 +593,4 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
} }
// TODO: clean deleted idx and data from persistent file // TODO: clean deleted idx and data from persistent file
int32_t tqStoreCompact(STqMetaStore *pMeta) { int32_t tqStoreCompact(STqMetaStore* pMeta) { return 0; }
return 0;
}

View File

@ -9,17 +9,17 @@ struct Foo {
int32_t a; int32_t a;
}; };
int FooSerializer(const void* pObj, TqSerializedHead** ppHead) { int FooSerializer(const void* pObj, STqSerializedHead** ppHead) {
Foo* foo = (Foo*) pObj; Foo* foo = (Foo*) pObj;
if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(TqSerializedHead) + sizeof(int32_t)) { if((*ppHead) == NULL || (*ppHead)->ssize < sizeof(STqSerializedHead) + sizeof(int32_t)) {
*ppHead = (TqSerializedHead*)realloc(*ppHead, sizeof(TqSerializedHead) + sizeof(int32_t)); *ppHead = (STqSerializedHead*)realloc(*ppHead, sizeof(STqSerializedHead) + sizeof(int32_t));
(*ppHead)->ssize = sizeof(TqSerializedHead) + sizeof(int32_t); (*ppHead)->ssize = sizeof(STqSerializedHead) + sizeof(int32_t);
} }
*(int32_t*)(*ppHead)->content = foo->a; *(int32_t*)(*ppHead)->content = foo->a;
return (*ppHead)->ssize; return (*ppHead)->ssize;
} }
const void* FooDeserializer(const TqSerializedHead* pHead, void** ppObj) { const void* FooDeserializer(const STqSerializedHead* pHead, void** ppObj) {
if(*ppObj == NULL) { if(*ppObj == NULL) {
*ppObj = realloc(*ppObj, sizeof(int32_t)); *ppObj = realloc(*ppObj, sizeof(int32_t));
} }

View File

@ -14,27 +14,21 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "cJSON.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h"
#include "tfile.h" #include "tfile.h"
#include "cJSON.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
#include <libgen.h> #include <libgen.h>
#include <regex.h> #include <regex.h>
int64_t inline walGetFirstVer(SWal *pWal) { int64_t inline walGetFirstVer(SWal* pWal) { return pWal->vers.firstVer; }
return pWal->vers.firstVer;
}
int64_t inline walGetSnaphostVer(SWal *pWal) { int64_t inline walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotVer; }
return pWal->vers.snapshotVer;
}
int64_t inline walGetLastVer(SWal *pWal) { int64_t inline walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
return pWal->vers.lastVer;
}
static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) { static inline int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer); return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
@ -68,7 +62,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
} }
} }
// load meta // load meta
// if not match, or meta missing // if not match, or meta missing
// rebuild meta // rebuild meta

View File

@ -14,11 +14,11 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "compare.h"
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h"
#include "tfile.h" #include "tfile.h"
#include "compare.h" #include "tref.h"
#include "walInt.h" #include "walInt.h"
typedef struct { typedef struct {
@ -34,9 +34,7 @@ static int32_t walCreateThread();
static void walStopThread(); static void walStopThread();
static void walFreeObj(void *pWal); static void walFreeObj(void *pWal);
int64_t walGetSeq() { int64_t walGetSeq() { return (int64_t)atomic_load_32(&tsWal.seq); }
return (int64_t)atomic_load_32(&tsWal.seq);
}
int32_t walInit() { int32_t walInit() {
int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1); int8_t old = atomic_val_compare_exchange_8(&tsWal.inited, 0, 1);
@ -132,10 +130,10 @@ SWal *walOpen(const char *path, SWalCfg *pCfg) {
} }
if (walCheckAndRepairIdx(pWal) < 0) { if (walCheckAndRepairIdx(pWal) < 0) {
} }
wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level, pWal->cfg.fsyncPeriod); wDebug("vgId:%d, wal:%p is opened, level:%d fsyncPeriod:%d", pWal->cfg.vgId, pWal, pWal->cfg.level,
pWal->cfg.fsyncPeriod);
return pWal; return pWal;
} }
@ -203,10 +201,12 @@ static void walFsyncAll() {
SWal *pWal = taosIterateRef(tsWal.refSetId, 0); SWal *pWal = taosIterateRef(tsWal.refSetId, 0);
while (pWal) { while (pWal) {
if (walNeedFsync(pWal)) { if (walNeedFsync(pWal)) {
wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq, atomic_load_32(&tsWal.seq)); wTrace("vgId:%d, do fsync, level:%d seq:%d rseq:%d", pWal->cfg.vgId, pWal->cfg.level, pWal->fsyncSeq,
atomic_load_32(&tsWal.seq));
int32_t code = tfFsync(pWal->writeLogTfd); int32_t code = tfFsync(pWal->writeLogTfd);
if (code != 0) { if (code != 0) {
wError("vgId:%d, file:%"PRId64".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(code)); wError("vgId:%d, file:%" PRId64 ".log, failed to fsync since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(code));
} }
} }
pWal = taosIterateRef(tsWal.refSetId, pWal->refId); pWal = taosIterateRef(tsWal.refSetId, pWal->refId);

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "walInt.h"
#include "tfile.h" #include "tfile.h"
#include "walInt.h"
SWalReadHandle *walOpenReadHandle(SWal *pWal) { SWalReadHandle *walOpenReadHandle(SWal *pWal) {
SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle)); SWalReadHandle *pRead = malloc(sizeof(SWalReadHandle));
@ -43,9 +43,7 @@ void walCloseReadHandle(SWalReadHandle *pRead) {
free(pRead); free(pRead);
} }
int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { int32_t walRegisterRead(SWalReadHandle *pRead, int64_t ver) { return 0; }
return 0;
}
static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) { static int32_t walReadSeekFilePos(SWalReadHandle *pRead, int64_t fileFirstVer, int64_t ver) {
int code = 0; int code = 0;
@ -105,7 +103,6 @@ static int32_t walReadSeekVer(SWalReadHandle *pRead, int64_t ver) {
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
WalFileInfo tmpInfo; WalFileInfo tmpInfo;

View File

@ -16,8 +16,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h" #include "taoserror.h"
#include "tref.h"
#include "tfile.h" #include "tfile.h"
#include "tref.h"
#include "walInt.h" #include "walInt.h"
static int walSeekFilePos(SWal* pWal, int64_t ver) { static int walSeekFilePos(SWal* pWal, int64_t ver) {
@ -116,7 +116,6 @@ int walSeekVer(SWal *pWal, int64_t ver) {
return -1; return -1;
} }
if (ver < pWal->vers.snapshotVer) { if (ver < pWal->vers.snapshotVer) {
} }
if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) { if (ver < walGetCurFileFirstVer(pWal) || (ver > walGetCurFileLastVer(pWal))) {
code = walChangeFile(pWal, ver); code = walChangeFile(pWal, ver);

View File

@ -164,8 +164,7 @@ int32_t walEndSnapshot(SWal *pWal) {
} }
// iterate files, until the searched result // iterate files, until the searched result
for (WalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) { for (WalFileInfo *iter = pWal->fileInfoSet->pData; iter < pInfo; iter++) {
if(pWal->totSize > pWal->cfg.retentionSize || if (pWal->totSize > pWal->cfg.retentionSize || iter->closeTs + pWal->cfg.retentionPeriod > ts) {
iter->closeTs + pWal->cfg.retentionPeriod > ts) {
// delete according to file size or close time // delete according to file size or close time
deleteCnt++; deleteCnt++;
newTotSize -= iter->fileSize; newTotSize -= iter->fileSize;
@ -189,7 +188,8 @@ int32_t walEndSnapshot(SWal *pWal) {
} else { } else {
pWal->vers.firstVer = ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer; pWal->vers.firstVer = ((WalFileInfo *)taosArrayGet(pWal->fileInfoSet, 0))->firstVer;
} }
pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;; pWal->writeCur = taosArrayGetSize(pWal->fileInfoSet) - 1;
;
pWal->totSize = newTotSize; pWal->totSize = newTotSize;
pWal->vers.verInSnapshotting = -1; pWal->vers.verInSnapshotting = -1;
@ -296,13 +296,15 @@ int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, i
if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) { if (tfWrite(pWal->writeLogTfd, &pWal->writeHead, sizeof(SWalHead)) != sizeof(SWalHead)) {
// ftruncate // ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
} }
if (tfWrite(pWal->writeLogTfd, (char *)body, bodyLen) != bodyLen) { if (tfWrite(pWal->writeLogTfd, (char *)body, bodyLen) != bodyLen) {
// ftruncate // ftruncate
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
wError("vgId:%d, file:%"PRId64".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%" PRId64 ".log, failed to write since %s", pWal->cfg.vgId, walGetLastFileFirstVer(pWal),
strerror(errno));
} }
code = walWriteIndex(pWal, index, offset); code = walWriteIndex(pWal, index, offset);
if (code != 0) { if (code != 0) {
@ -325,7 +327,8 @@ void walFsync(SWal *pWal, bool forceFsync) {
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) { if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal)); wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
if (tfFsync(pWal->writeLogTfd) < 0) { if (tfFsync(pWal->writeLogTfd) < 0) {
wError("vgId:%d, file:%"PRId64".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal), strerror(errno)); wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
strerror(errno));
} }
} }
} }

View File

@ -15,9 +15,7 @@ class WalCleanEnv : public ::testing::Test {
ASSERT(code == 0); ASSERT(code == 0);
} }
static void TearDownTestCase() { static void TearDownTestCase() { walCleanUp(); }
walCleanUp();
}
void SetUp() override { void SetUp() override {
taosRemoveDir(pathName); taosRemoveDir(pathName);
@ -49,9 +47,7 @@ class WalCleanDeleteEnv : public ::testing::Test {
ASSERT(code == 0); ASSERT(code == 0);
} }
static void TearDownTestCase() { static void TearDownTestCase() { walCleanUp(); }
walCleanUp();
}
void SetUp() override { void SetUp() override {
taosRemoveDir(pathName); taosRemoveDir(pathName);
@ -81,9 +77,7 @@ class WalKeepEnv : public ::testing::Test {
ASSERT(code == 0); ASSERT(code == 0);
} }
static void TearDownTestCase() { static void TearDownTestCase() { walCleanUp(); }
walCleanUp();
}
void walResetEnv() { void walResetEnv() {
TearDown(); TearDown();