diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index 7a21b08aaf..2785b6de96 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -109,11 +109,10 @@ typedef struct TqTopicVhandle { #define TQ_BUFFER_SIZE 8 -// TODO: define a serializer and deserializer typedef struct TqBufferItem { int64_t offset; // executors are identical but not concurrent - // so it must be a copy in each item + // so there must be a copy in each item void* executor; int64_t size; void* content; @@ -156,23 +155,111 @@ typedef struct TqQueryMsg { typedef struct TqLogReader { void* logHandle; - int32_t (*walRead)(void* logHandle, void** data, int64_t ver); - int64_t (*walGetFirstVer)(void* logHandle); - int64_t (*walGetSnapshotVer)(void* logHandle); - int64_t (*walGetLastVer)(void* logHandle); + int32_t (*logRead)(void* logHandle, void** data, int64_t ver); + int64_t (*logGetFirstVer)(void* logHandle); + int64_t (*logGetSnapshotVer)(void* logHandle); + int64_t (*logGetLastVer)(void* logHandle); } TqLogReader; typedef struct TqConfig { // TODO } TqConfig; +typedef struct TqMemRef { + SMemAllocatorFactory *pAlloctorFactory; + SMemAllocator *pAllocator; +} TqMemRef; + +typedef struct TqSerializedHead { + int16_t ver; + int16_t action; + int32_t checksum; + int64_t ssize; + char content[]; +} TqSerializedHead; + +typedef int (*TqSerializeFun)(const void* pObj, TqSerializedHead** ppHead); +typedef const void* (*TqDeserializeFun)(const TqSerializedHead* pHead, void** ppObj); +typedef void (*TqDeleteFun)(void*); + +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + +#define TQ_PAGE_SIZE 4096 +//key + offset + size +#define TQ_IDX_SIZE 24 +//4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +//24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +//4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + +#define TQ_ACTION_CONST 0 +#define TQ_ACTION_INUSE 1 +#define TQ_ACTION_INUSE_CONT 2 +#define TQ_ACTION_INTXN 3 + +#define TQ_SVER 0 + +//TODO: inplace mode is not implemented +#define TQ_UPDATE_INPLACE 0 +#define TQ_UPDATE_APPEND 1 + +#define TQ_DUP_INTXN_REWRITE 0 +#define TQ_DUP_INTXN_REJECT 2 + +static inline bool TqUpdateAppend(int32_t tqConfigFlag) { + return tqConfigFlag & TQ_UPDATE_APPEND; +} + +static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { + return tqConfigFlag & TQ_DUP_INTXN_REJECT; +} + +static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; +#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE + +typedef struct TqMetaHandle { + int64_t key; + int64_t offset; + int64_t serializedSize; + void* valueInUse; + void* valueInTxn; +} TqMetaHandle; + +typedef struct TqMetaList { + TqMetaHandle handle; + struct TqMetaList* next; + //struct TqMetaList* inTxnPrev; + //struct TqMetaList* inTxnNext; + struct TqMetaList* unpersistPrev; + struct TqMetaList* unpersistNext; +} TqMetaList; + +typedef struct TqMetaStore { + TqMetaList* bucket[TQ_BUCKET_SIZE]; + //a table head + TqMetaList* unpersistHead; + //TODO:temporaral use, to be replaced by unified tfile + int fileFd; + //TODO:temporaral use, to be replaced by unified tfile + int idxFd; + char* dirPath; + int32_t tqConfigFlag; + TqSerializeFun pSerializer; + TqDeserializeFun pDeserializer; + TqDeleteFun pDeleter; +} TqMetaStore; + typedef struct STQ { // the collection of group handle // the handle of kvstore - const char* path; + char* path; TqConfig* tqConfig; TqLogReader* tqLogReader; - SMemAllocatorFactory* allocFac; + TqMemRef tqMemRef; + TqMetaStore* tqMeta; } STQ; // open in each vnode @@ -187,7 +274,7 @@ int tqConsume(STQ*, TmqConsumeReq*); TqGroupHandle* tqGetGroupHandle(STQ*, int64_t cId); -int tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); +TqGroupHandle* tqOpenTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseTCGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqMoveOffsetToNext(TqGroupHandle*); int tqResetOffset(STQ*, int64_t topicId, int64_t cgId, int64_t offset); @@ -195,18 +282,9 @@ int tqRegisterContext(TqGroupHandle*, void* ahandle); int tqLaunchQuery(TqGroupHandle*); int tqSendLaunchQuery(TqGroupHandle*); -int tqSerializeGroupHandle(TqGroupHandle* gHandle, void** ppBytes); -void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr); -void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr); -void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr); +int tqSerializeGroupHandle(const TqGroupHandle* gHandle, TqSerializedHead** ppHead); -const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle* ghandle); -const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle); -const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem); - -int tqGetGHandleSSize(const TqGroupHandle* gHandle); -int tqBufHandleSSize(); -int tqBufItemSSize(); +const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle** gHandle); #ifdef __cplusplus } diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 0829782310..ba37e6880b 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -44,8 +44,10 @@ typedef struct { EWalType walLevel; // wal level } SWalCfg; -struct SWal; -typedef struct SWal SWal; // WAL HANDLE +typedef struct SWal { + int8_t unused; +} SWal; // WAL HANDLE + typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg); // module initialization diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 63e48625d9..b9e702a89a 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -17,97 +17,22 @@ #define _TQ_META_STORE_H_ #include "os.h" - +#include "tq.h" #ifdef __cplusplus extern "C" { #endif -#define TQ_BUCKET_MASK 0xFF -#define TQ_BUCKET_SIZE 256 - -#define TQ_PAGE_SIZE 4096 -//key + offset + size -#define TQ_IDX_SIZE 24 -//4096 / 24 -#define TQ_MAX_IDX_ONE_PAGE 170 -//24 * 170 -#define TQ_IDX_PAGE_BODY_SIZE 4080 -//4096 - 4080 -#define TQ_IDX_PAGE_HEAD_SIZE 16 - -#define TQ_ACTION_CONST 0 -#define TQ_ACTION_INUSE 1 -#define TQ_ACTION_INUSE_CONT 2 -#define TQ_ACTION_INTXN 3 - -#define TQ_SVER 0 - -//TODO: inplace mode is not implemented -#define TQ_UPDATE_INPLACE 0 -#define TQ_UPDATE_APPEND 1 - -#define TQ_DUP_INTXN_REWRITE 0 -#define TQ_DUP_INTXN_REJECT 2 - -static inline bool TqUpdateAppend(int32_t tqConfigFlag) { - return tqConfigFlag & TQ_UPDATE_APPEND; -} - -static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { - return tqConfigFlag & TQ_DUP_INTXN_REJECT; -} - -static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; -#define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE - -typedef struct TqSerializedHead { - int16_t ver; - int16_t action; - int32_t checksum; - int64_t ssize; - char content[]; -} TqSerializedHead; - -typedef struct TqMetaHandle { - int64_t key; - int64_t offset; - int64_t serializedSize; - void* valueInUse; - void* valueInTxn; -} TqMetaHandle; - -typedef struct TqMetaList { - TqMetaHandle handle; - struct TqMetaList* next; - //struct TqMetaList* inTxnPrev; - //struct TqMetaList* inTxnNext; - struct TqMetaList* unpersistPrev; - struct TqMetaList* unpersistNext; -} TqMetaList; - -typedef struct TqMetaStore { - TqMetaList* bucket[TQ_BUCKET_SIZE]; - //a table head - TqMetaList* unpersistHead; - int fileFd; //TODO:temporaral use, to be replaced by unified tfile - int idxFd; //TODO:temporaral use, to be replaced by unified tfile - char* dirPath; - int32_t tqConfigFlag; - int (*serializer)(const void* pObj, TqSerializedHead** ppHead); - const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj); - void (*deleter)(void*); -} TqMetaStore; TqMetaStore* tqStoreOpen(const char* path, - int serializer(const void* pObj, TqSerializedHead** ppHead), - const void* deserializer(const TqSerializedHead* pHead, void** ppObj), - void deleter(void* pObj), - int32_t tqConfigFlag + TqSerializeFun pSerializer, + TqDeserializeFun pDeserializer, + TqDeleteFun pDeleter, + int32_t tqConfigFlag ); int32_t tqStoreClose(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*); -//int32_t TqStoreCommitAll(TqMetaStore*); +//int32_t tqStoreCommitAll(TqMetaStore*); int32_t tqStorePersist(TqMetaStore*); //clean deleted idx and data from persistent file int32_t tqStoreCompact(TqMetaStore*); diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index cf98e3e1a4..c010042b8c 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -14,6 +14,7 @@ */ #include "tqInt.h" +#include "tqMetaStore.h" //static //read next version data @@ -24,6 +25,46 @@ // int tqGetgHandleSSize(const TqGroupHandle *gHandle); +int tqBufHandleSSize(); +int tqBufItemSSize(); + +TqGroupHandle* tqFindHandle(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + TqGroupHandle* gHandle; + return NULL; +} + +void* tqSerializeListHandle(TqListHandle* listHandle, void* ptr); +void* tqSerializeBufHandle(TqBufferHandle* bufHandle, void* ptr); +void* tqSerializeBufItem(TqBufferItem* bufItem, void* ptr); + +const void* tqDeserializeBufHandle(const void* pBytes, TqBufferHandle* bufHandle); +const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem* bufItem); + +STQ* tqOpen(const char* path, TqConfig* tqConfig, TqLogReader* tqLogReader, SMemAllocatorFactory *allocFac) { + STQ* pTq = malloc(sizeof(STQ)); + if(pTq == NULL) { + //TODO: memory error + return NULL; + } + strcpy(pTq->path, path); + pTq->tqConfig = tqConfig; + pTq->tqLogReader = tqLogReader; + pTq->tqMemRef.pAlloctorFactory = allocFac; + pTq->tqMemRef.pAllocator = allocFac->create(); + if(pTq->tqMemRef.pAllocator == NULL) { + //TODO + } + pTq->tqMeta = tqStoreOpen(path, + (TqSerializeFun)tqSerializeGroupHandle, + (TqDeserializeFun)tqDeserializeGroupHandle, + free, + 0); + if(pTq->tqMeta == NULL) { + //TODO: free STQ + return NULL; + } + return pTq; +} static int tqProtoCheck(TmqMsgHead *pMsg) { return pMsg->protoVer == 0; @@ -83,14 +124,29 @@ static int tqCommitTCGroup(TqGroupHandle* handle) { int tqCreateTCGroup(STQ *pTq, int64_t topicId, int64_t cgId, int64_t cId, TqGroupHandle** handle) { //create in disk + TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle)); + if(gHandle == NULL) { + //TODO + return -1; + } + memset(gHandle, 0, sizeof(TqGroupHandle)); + return 0; } -int tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { - //look up in disk +TqGroupHandle* tqOpenTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { + TqGroupHandle* gHandle = tqHandleGet(pTq->tqMeta, cId); + if(gHandle == NULL) { + int code = tqCreateTCGroup(pTq, topicId, cgId, cId, &gHandle); + if(code != 0) { + //TODO + return NULL; + } + } + //create //open - return 0; + return gHandle; } int tqCloseTCGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId) { @@ -207,16 +263,20 @@ int tqConsume(STQ* pTq, TmqConsumeReq* pMsg) { return 0; } -int tqSerializeGroupHandle(TqGroupHandle *gHandle, void** ppBytes) { +int tqSerializeGroupHandle(const TqGroupHandle *gHandle, TqSerializedHead** ppHead) { //calculate size - int sz = tqGetgHandleSSize(gHandle); - void* ptr = realloc(*ppBytes, sz); - if(ptr == NULL) { - free(ppBytes); - //TODO: memory err - return -1; + int sz = tqGetgHandleSSize(gHandle) + sizeof(TqSerializedHead); + if(sz > (*ppHead)->ssize) { + void* tmpPtr = realloc(*ppHead, sz); + if(tmpPtr == NULL) { + free(*ppHead); + //TODO: memory err + return -1; + } + *ppHead = tmpPtr; + (*ppHead)->ssize = sz; } - *ppBytes = ptr; + void* ptr = (*ppHead)->content; //do serialization *(int64_t*)ptr = gHandle->cId; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); @@ -261,8 +321,9 @@ void* tqSerializeBufItem(TqBufferItem *bufItem, void* ptr) { return ptr; } -const void* tqDeserializeGroupHandle(const void* pBytes, TqGroupHandle *gHandle) { - const void* ptr = pBytes; +const void* tqDeserializeGroupHandle(const TqSerializedHead* pHead, TqGroupHandle **ppGHandle) { + TqGroupHandle *gHandle = *ppGHandle; + const void* ptr = pHead->content; gHandle->cId = *(int64_t*)ptr; ptr = POINTER_SHIFT(ptr, sizeof(int64_t)); gHandle->cgId = *(int64_t*)ptr; @@ -317,15 +378,15 @@ const void* tqDeserializeBufItem(const void* pBytes, TqBufferItem *bufItem) { //TODO: make this a macro int tqGetgHandleSSize(const TqGroupHandle *gHandle) { - return sizeof(int64_t) * 2 - + sizeof(int32_t) + return sizeof(int64_t) * 2 //cId + cgId + + sizeof(int32_t) //topicNum + gHandle->topicNum * tqBufHandleSSize(); } //TODO: make this a macro int tqBufHandleSSize() { - return sizeof(int64_t) * 2 - + sizeof(int32_t) * 2 + return sizeof(int64_t) * 2 // nextConsumeOffset + topicId + + sizeof(int32_t) * 2 // head + tail + TQ_BUFFER_SIZE * tqBufItemSSize(); } diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index f8202941bb..71d1e8d890 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -69,10 +69,10 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { } TqMetaStore* tqStoreOpen(const char* path, - int serializer(const void* pObj, TqSerializedHead** ppHead), - const void* deserializer(const TqSerializedHead* pHead, void** ppObj), - void deleter(void* pObj), - int32_t tqConfigFlag + TqSerializeFun serializer, + TqDeserializeFun deserializer, + TqDeleteFun deleter, + int32_t tqConfigFlag ) { TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); if(pMeta == NULL) { @@ -127,9 +127,9 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->fileFd = fileFd; - pMeta->serializer = serializer; - pMeta->deserializer = deserializer; - pMeta->deleter = deleter; + pMeta->pSerializer = serializer; + pMeta->pDeserializer = deserializer; + pMeta->pDeleter = deleter; pMeta->tqConfigFlag = tqConfigFlag; //read idx file and load into memory @@ -171,25 +171,25 @@ TqMetaStore* tqStoreOpen(const char* path, } if(serializedObj->action == TQ_ACTION_INUSE) { if(serializedObj->ssize != sizeof(TqSerializedHead)) { - pMeta->deserializer(serializedObj, &pNode->handle.valueInUse); + pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); } else { pNode->handle.valueInUse = TQ_DELETE_TOKEN; } } else if(serializedObj->action == TQ_ACTION_INTXN) { if(serializedObj->ssize != sizeof(TqSerializedHead)) { - pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn); + pMeta->pDeserializer(serializedObj, &pNode->handle.valueInTxn); } else { pNode->handle.valueInTxn = TQ_DELETE_TOKEN; } } else if(serializedObj->action == TQ_ACTION_INUSE_CONT) { if(serializedObj->ssize != sizeof(TqSerializedHead)) { - pMeta->deserializer(serializedObj, &pNode->handle.valueInUse); + pMeta->pDeserializer(serializedObj, &pNode->handle.valueInUse); } else { pNode->handle.valueInUse = TQ_DELETE_TOKEN; } TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); if(ptr->ssize != sizeof(TqSerializedHead)) { - pMeta->deserializer(ptr, &pNode->handle.valueInTxn); + pMeta->pDeserializer(ptr, &pNode->handle.valueInTxn); } else { pNode->handle.valueInTxn = TQ_DELETE_TOKEN; } @@ -225,11 +225,11 @@ TqMetaStore* tqStoreOpen(const char* path, if(pBucketNode) { if(pBucketNode->handle.valueInUse && pBucketNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pBucketNode->handle.valueInUse); + pMeta->pDeleter(pBucketNode->handle.valueInUse); } if(pBucketNode->handle.valueInTxn && pBucketNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pBucketNode->handle.valueInTxn); + pMeta->pDeleter(pBucketNode->handle.valueInTxn); } free(pBucketNode); } @@ -253,11 +253,11 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { ASSERT(pNode->unpersistPrev == NULL); if(pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + pMeta->pDeleter(pNode->handle.valueInTxn); } if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInUse); + pMeta->pDeleter(pNode->handle.valueInUse); } TqMetaList* next = pNode->next; free(pNode); @@ -280,11 +280,11 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { while(pNode) { if(pNode->handle.valueInTxn && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + pMeta->pDeleter(pNode->handle.valueInTxn); } if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInUse); + pMeta->pDeleter(pNode->handle.valueInUse); } TqMetaList* next = pNode->next; free(pNode); @@ -338,7 +338,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { if(pNode->handle.valueInUse == TQ_DELETE_TOKEN) { pSHead->ssize = sizeof(TqSerializedHead); } else { - pMeta->serializer(pNode->handle.valueInUse, &pSHead); + pMeta->pSerializer(pNode->handle.valueInUse, &pSHead); } nBytes = write(pMeta->fileFd, pSHead, pSHead->ssize); ASSERT(nBytes == pSHead->ssize); @@ -349,7 +349,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { if(pNode->handle.valueInTxn == TQ_DELETE_TOKEN) { pSHead->ssize = sizeof(TqSerializedHead); } else { - pMeta->serializer(pNode->handle.valueInTxn, &pSHead); + pMeta->pSerializer(pNode->handle.valueInTxn, &pSHead); } int nBytesTxn = write(pMeta->fileFd, pSHead, pSHead->ssize); ASSERT(nBytesTxn == pSHead->ssize); @@ -423,7 +423,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value //TODO: think about thread safety if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInUse); + pMeta->pDeleter(pNode->handle.valueInUse); } //change pointer ownership pNode->handle.valueInUse = value; @@ -496,7 +496,7 @@ static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* val return -2; } if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + pMeta->pDeleter(pNode->handle.valueInTxn); } } pNode->handle.valueInTxn = value; @@ -562,7 +562,7 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { } if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInUse); + pMeta->pDeleter(pNode->handle.valueInUse); } pNode->handle.valueInUse = pNode->handle.valueInTxn; pNode->handle.valueInTxn = NULL; @@ -582,7 +582,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { if(pNode->handle.key == key) { if(pNode->handle.valueInTxn) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + pMeta->pDeleter(pNode->handle.valueInTxn); } pNode->handle.valueInTxn = NULL; tqLinkUnpersist(pMeta, pNode); @@ -602,7 +602,7 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { while(pNode) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn) { - pMeta->deleter(pNode->handle.valueInTxn); + pMeta->pDeleter(pNode->handle.valueInTxn); } pNode->handle.valueInTxn = TQ_DELETE_TOKEN; tqLinkUnpersist(pMeta, pNode); diff --git a/source/dnode/vnode/tq/test/tqSerializerTest.cpp b/source/dnode/vnode/tq/test/tqSerializerTest.cpp new file mode 100644 index 0000000000..0d76322c17 --- /dev/null +++ b/source/dnode/vnode/tq/test/tqSerializerTest.cpp @@ -0,0 +1,13 @@ +#include +#include +#include +#include + +#include "tq.h" + +using namespace std; + +TEST(TqSerializerTest, basicTest) { + TqGroupHandle* gHandle = (TqGroupHandle*)malloc(sizeof(TqGroupHandle)); + +} diff --git a/source/libs/wal/src/wal.c b/source/libs/wal/src/wal.c index f25c127f3f..c107a94f3f 100644 --- a/source/libs/wal/src/wal.c +++ b/source/libs/wal/src/wal.c @@ -19,11 +19,19 @@ int32_t walInit() { return 0; } void walCleanUp() {} -SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; } +SWal *walOpen(char *path, SWalCfg *pCfg) { + SWal* pWal = malloc(sizeof(SWal)); + if(pWal == NULL) { + return NULL; + } + return pWal; +} int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; } -void walClose(SWal *pWal) {} +void walClose(SWal *pWal) { + if(pWal) free(pWal); +} void walFsync(SWal *pWal, bool force) {}