From 34581781e6f734530bbb4541e2382b52da318c4d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 9 Nov 2021 15:16:10 +0800 Subject: [PATCH] fix memory leak --- source/dnode/vnode/tq/inc/tqMetaStore.h | 13 +-- source/dnode/vnode/tq/src/tqMetaStore.c | 39 +++++--- source/dnode/vnode/tq/test/tqMetaTest.cpp | 116 ++++++++++++++++++++++ 3 files changed, 142 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 2b3ddc8765..c62e7db111 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -24,7 +24,9 @@ extern "C" { #endif -#define TQ_BUCKET_SIZE 0xFF +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + #define TQ_PAGE_SIZE 4096 //key + offset + size #define TQ_IDX_SIZE 24 @@ -35,15 +37,6 @@ extern "C" { //4096 - 4080 #define TQ_IDX_PAGE_HEAD_SIZE 16 - -inline static int TqMaxEntryOnePage() { //170 - return TQ_PAGE_SIZE / TQ_IDX_SIZE; -} - -inline static int TqEmptyTail() { //16 - return TQ_PAGE_SIZE - TqMaxEntryOnePage(); -} - #define TQ_ACTION_CONST 0 #define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE_CONT 2 diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index e99d98edb3..8e1f78fbe0 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -130,7 +130,6 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->deleter = deleter; //read idx file and load into memory - /*char idxBuf[TQ_PAGE_SIZE];*/ TqIdxPageBuf idxBuf; TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if(serializedObj == NULL) { @@ -196,7 +195,7 @@ TqMetaStore* tqStoreOpen(const char* path, } //put into list - int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; + int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; TqMetaList* pBucketNode = pMeta->bucket[bucketKey]; if(pBucketNode == NULL) { pMeta->bucket[bucketKey] = pNode; @@ -205,15 +204,18 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->bucket[bucketKey] = pNode; } else { while(pBucketNode->next && - pBucketNode->next->handle.key == pNode->handle.key) { + pBucketNode->next->handle.key != pNode->handle.key) { pBucketNode = pBucketNode->next; } if(pBucketNode->next) { ASSERT(pBucketNode->next->handle.key == pNode->handle.key); - TqMetaList *pNodeTmp = pBucketNode->next; - pBucketNode->next = pNodeTmp->next; - pBucketNode = pNodeTmp; + TqMetaList *pNodeFound = pBucketNode->next; + pNode->next = pNodeFound->next; + pBucketNode->next = pNode; + pBucketNode = pNodeFound; } else { + pNode->next = pMeta->bucket[bucketKey]; + pMeta->bucket[bucketKey] = pNode; pBucketNode = NULL; } } @@ -359,11 +361,13 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { *(bufPtr++) = pNode->handle.offset; *(bufPtr++) = (int64_t)nBytes; idxBuf.head.writeOffset += TQ_IDX_SIZE; + if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE); //TODO: handle error with tfile ASSERT(nBytes == TQ_PAGE_SIZE); memset(&idxBuf, 0, TQ_PAGE_SIZE); + idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; bufPtr = (int64_t*)&idxBuf.buffer; } //remove from unpersist list @@ -376,7 +380,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { if(pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL ) { - int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; + int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; if(pBucketHead == pNode) { pMeta->bucket[bucketKey] = pNode->next; @@ -409,7 +413,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { } static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -442,7 +446,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value } void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -460,7 +464,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -498,7 +502,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi return -1; } memcpy(vmem, value, vsize); - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -530,7 +534,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi } static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -548,10 +552,13 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { + if(pNode->handle.valueInTxn == NULL) { + return -1; + } if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->deleter(pNode->handle.valueInUse); @@ -564,11 +571,11 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - return -1; + return -2; } int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -589,7 +596,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index d0511c2e2c..f0241257b8 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -176,3 +176,119 @@ TEST_F(TqMetaTest, intxnPersist) { pFoo1 = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo1->a, 4); } + +TEST_F(TqMetaTest, multiplePage) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + for(int i = 0; i < 500; i++) { + tqHandleCommit(pMeta, i); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + for(int i = 500; i < 1000; i++) { + tqHandleCommit(pMeta, i); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +} + +TEST_F(TqMetaTest, multipleRewrite) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 500; i++) { + tqHandleCommit(pMeta, i); + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 500; i < 1000; i++) { + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 1000; i++) { + tqHandleCommit(pMeta, i); + } + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + for(int i = 500; i < 1000; i++) { + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + tqHandleCommit(pMeta, i); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +} + +TEST_F(TqMetaTest, dupCommit) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 1000; i++) { + int ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, 0); + ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, -1); + } + + for(int i = 0; i < 1000; i++) { + int ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, -1); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +}