diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt index 0dcb7a240c..d8026bd56f 100644 --- a/deps/CMakeLists.txt +++ b/deps/CMakeLists.txt @@ -50,8 +50,11 @@ endif(${BUILD_WITH_LEVELDB}) # rocksdb # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev if(${BUILD_WITH_ROCKSDB}) + SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-error=maybe-uninitialized") option(WITH_TESTS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF) + option(WITH_TOOLS "" OFF) + option(WITH_LIBURING "" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) add_subdirectory(rocksdb) target_include_directories( diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index c62e7db111..3e49f11254 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -44,6 +44,21 @@ extern "C" { #define TQ_SVER 0 +//TODO: inplace mode is not implemented +#define TQ_UPDATE_INPLACE 0 +#define TQ_UPDATE_APPEND 1 + +#define TQ_DUP_INTXN_REWRITE 0 +#define TQ_DUP_INTXN_REJECT 2 + +static inline bool TqUpdateAppend(int32_t tqConfigFlag) { + return tqConfigFlag & TQ_UPDATE_APPEND; +} + +static inline bool TqDupIntxnReject(int32_t tqConfigFlag) { + return tqConfigFlag & TQ_DUP_INTXN_REJECT; +} + static const int8_t TQ_CONST_DELETE = TQ_ACTION_CONST; #define TQ_DELETE_TOKEN (void*)&TQ_CONST_DELETE @@ -79,6 +94,7 @@ typedef struct TqMetaStore { int fileFd; //TODO:temporaral use, to be replaced by unified tfile int idxFd; //TODO:temporaral use, to be replaced by unified tfile char* dirPath; + int32_t tqConfigFlag; int (*serializer)(const void* pObj, TqSerializedHead** ppHead); const void* (*deserializer)(const TqSerializedHead* pHead, void** ppObj); void (*deleter)(void*); @@ -87,7 +103,9 @@ typedef struct TqMetaStore { TqMetaStore* tqStoreOpen(const char* path, int serializer(const void* pObj, TqSerializedHead** ppHead), const void* deserializer(const TqSerializedHead* pHead, void** ppObj), - void deleter(void* pObj)); + void deleter(void* pObj), + int32_t tqConfigFlag + ); int32_t tqStoreClose(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*); //int32_t TqStoreCommitAll(TqMetaStore*); @@ -96,6 +114,8 @@ int32_t tqStorePersist(TqMetaStore*); int32_t tqStoreCompact(TqMetaStore*); void* tqHandleGet(TqMetaStore*, int64_t key); +//make it unpersist +void* tqHandleTouchGet(TqMetaStore*, int64_t key); int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); //delete committed kv pair diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 8e1f78fbe0..f8202941bb 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -71,7 +71,9 @@ static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { TqMetaStore* tqStoreOpen(const char* path, int serializer(const void* pObj, TqSerializedHead** ppHead), const void* deserializer(const TqSerializedHead* pHead, void** ppObj), - void deleter(void* pObj)) { + void deleter(void* pObj), + int32_t tqConfigFlag + ) { TqMetaStore* pMeta = malloc(sizeof(TqMetaStore)); if(pMeta == NULL) { //close @@ -128,6 +130,7 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->serializer = serializer; pMeta->deserializer = deserializer; pMeta->deleter = deleter; + pMeta->tqConfigFlag = tqConfigFlag; //read idx file and load into memory TqIdxPageBuf idxBuf; @@ -463,17 +466,39 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { return NULL; } -int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { +void* tqHandleTouchGet(TqMetaStore* pMeta, int64_t key) { + int64_t bucketKey = key & TQ_BUCKET_MASK; + TqMetaList* pNode = pMeta->bucket[bucketKey]; + while(pNode) { + if(pNode->handle.key == key) { + if(pNode->handle.valueInUse != NULL + && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { + tqLinkUnpersist(pMeta, pNode); + return pNode->handle.valueInUse; + } else { + return NULL; + } + } else { + pNode = pNode->next; + } + } + return NULL; +} + +static inline int32_t tqHandlePutImpl(TqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { //TODO: think about thread safety - if(pNode->handle.valueInTxn - && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + if(pNode->handle.valueInTxn) { + if(TqDupIntxnReject(pMeta->tqConfigFlag)) { + return -2; + } + if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { + pMeta->deleter(pNode->handle.valueInTxn); + } } - //change pointer ownership pNode->handle.valueInTxn = value; tqLinkUnpersist(pMeta, pNode); return 0; @@ -495,6 +520,10 @@ int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { return 0; } +int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { + return tqHandlePutImpl(pMeta, key, value); +} + int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { void *vmem = malloc(vsize); if(vmem == NULL) { @@ -502,35 +531,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi return -1; } memcpy(vmem, value, vsize); - int64_t bucketKey = key & TQ_BUCKET_MASK; - TqMetaList* pNode = pMeta->bucket[bucketKey]; - while(pNode) { - if(pNode->handle.key == key) { - //TODO: think about thread safety - if(pNode->handle.valueInTxn - && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); - } - //change pointer ownership - pNode->handle.valueInTxn = vmem; - tqLinkUnpersist(pMeta, pNode); - return 0; - } else { - pNode = pNode->next; - } - } - TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); - if(pNewNode == NULL) { - //TODO: memory error - return -1; - } - memset(pNewNode, 0, sizeof(TqMetaList)); - pNewNode->handle.key = key; - pNewNode->handle.valueInTxn = vmem; - pNewNode->next = pMeta->bucket[bucketKey]; - pMeta->bucket[bucketKey] = pNewNode; - tqLinkUnpersist(pMeta, pNewNode); - return 0; + return tqHandlePutImpl(pMeta, key, vmem); } static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index f0241257b8..bbd436ab1a 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -32,13 +32,15 @@ void FooDeleter(void* pObj) { free(pObj); } -class TqMetaTest : public ::testing::Test { +class TqMetaUpdateAppendTest : public ::testing::Test { protected: void SetUp() override { taosRemoveDir(pathName); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); } @@ -50,7 +52,7 @@ class TqMetaTest : public ::testing::Test { const char* pathName = "/tmp/tq_test"; }; -TEST_F(TqMetaTest, copyPutTest) { +TEST_F(TqMetaUpdateAppendTest, copyPutTest) { Foo foo; foo.a = 3; tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); @@ -63,7 +65,7 @@ TEST_F(TqMetaTest, copyPutTest) { EXPECT_EQ(pFoo->a, 3); } -TEST_F(TqMetaTest, persistTest) { +TEST_F(TqMetaUpdateAppendTest, persistTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 2; tqHandleMovePut(pMeta, 1, pFoo); @@ -77,7 +79,9 @@ TEST_F(TqMetaTest, persistTest) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); pBar = (Foo*)tqHandleGet(pMeta, 1); @@ -88,7 +92,7 @@ TEST_F(TqMetaTest, persistTest) { EXPECT_EQ(pBar == NULL, true); } -TEST_F(TqMetaTest, uncommittedTest) { +TEST_F(TqMetaUpdateAppendTest, uncommittedTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); @@ -97,7 +101,7 @@ TEST_F(TqMetaTest, uncommittedTest) { EXPECT_EQ(pFoo == NULL, true); } -TEST_F(TqMetaTest, abortTest) { +TEST_F(TqMetaUpdateAppendTest, abortTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); @@ -110,7 +114,7 @@ TEST_F(TqMetaTest, abortTest) { EXPECT_EQ(pFoo == NULL, true); } -TEST_F(TqMetaTest, deleteTest) { +TEST_F(TqMetaUpdateAppendTest, deleteTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); @@ -135,14 +139,16 @@ TEST_F(TqMetaTest, deleteTest) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } -TEST_F(TqMetaTest, intxnPersist) { +TEST_F(TqMetaUpdateAppendTest, intxnPersist) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; tqHandleMovePut(pMeta, 1, pFoo); @@ -157,7 +163,9 @@ TEST_F(TqMetaTest, intxnPersist) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); @@ -170,14 +178,16 @@ TEST_F(TqMetaTest, intxnPersist) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); pFoo1 = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo1->a, 4); } -TEST_F(TqMetaTest, multiplePage) { +TEST_F(TqMetaUpdateAppendTest, multiplePage) { srand(0); std::vector v; for(int i = 0; i < 1000; i++) { @@ -195,7 +205,9 @@ TEST_F(TqMetaTest, multiplePage) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); for(int i = 500; i < 1000; i++) { @@ -213,7 +225,7 @@ TEST_F(TqMetaTest, multiplePage) { } -TEST_F(TqMetaTest, multipleRewrite) { +TEST_F(TqMetaUpdateAppendTest, multipleRewrite) { srand(0); std::vector v; for(int i = 0; i < 1000; i++) { @@ -244,7 +256,9 @@ TEST_F(TqMetaTest, multipleRewrite) { tqStoreClose(pMeta); pMeta = tqStoreOpen(pathName, - FooSerializer, FooDeserializer, FooDeleter); + FooSerializer, FooDeserializer, FooDeleter, + TQ_UPDATE_APPEND + ); ASSERT(pMeta); for(int i = 500; i < 1000; i++) { @@ -263,7 +277,7 @@ TEST_F(TqMetaTest, multipleRewrite) { } -TEST_F(TqMetaTest, dupCommit) { +TEST_F(TqMetaUpdateAppendTest, dupCommit) { srand(0); std::vector v; for(int i = 0; i < 1000; i++) {