From 9edc17e575b2bb1493650a52c37af099289deabe Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 5 Nov 2021 17:17:21 +0800 Subject: [PATCH 1/5] fix tqHandleDel bug --- source/dnode/vnode/tq/src/tq.c | 3 +++ source/dnode/vnode/tq/src/tqMetaStore.c | 22 +++++++++++----------- source/dnode/vnode/tq/test/tqMetaTest.cpp | 7 +++++++ 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/source/dnode/vnode/tq/src/tq.c b/source/dnode/vnode/tq/src/tq.c index 1aa8f231c3..cf98e3e1a4 100644 --- a/source/dnode/vnode/tq/src/tq.c +++ b/source/dnode/vnode/tq/src/tq.c @@ -22,6 +22,9 @@ // //handle management message // + +int tqGetgHandleSSize(const TqGroupHandle *gHandle); + static int tqProtoCheck(TmqMsgHead *pMsg) { return pMsg->protoVer == 0; } diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index a4c2b90491..eb2c3404fc 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -27,15 +27,14 @@ static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value); static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key); static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) { - if(pNode->unpersistNext == NULL) { - pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; - pNode->unpersistPrev = pMeta->unpersistHead; - pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode; - pMeta->unpersistHead->unpersistNext = pNode; - } + if(pNode->unpersistNext == NULL) { + pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; + pNode->unpersistPrev = pMeta->unpersistHead; + pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode; + pMeta->unpersistHead->unpersistNext = pNode; + } } - typedef struct TqMetaPageBuf { int16_t offset; char buffer[TQ_PAGE_SIZE]; @@ -401,7 +400,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { - if(pNode->handle.valueInUse != NULL) { + if(pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { return pNode->handle.valueInUse; } else { return NULL; @@ -546,9 +545,10 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_SIZE; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { - if(pNode->handle.valueInTxn - && pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); + if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { + if(pNode->handle.valueInTxn) { + pMeta->deleter(pNode->handle.valueInTxn); + } pNode->handle.valueInTxn = TQ_DELETE_TOKEN; tqLinkUnpersist(pMeta, pNode); return 0; diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index 20a0368c4c..a1021233db 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -130,4 +130,11 @@ TEST_F(TqMetaTest, deleteTest) { tqHandleCommit(pMeta, 1); pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + pFoo = (Foo*) tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo == NULL, true); } From 5ebc77961d36c786d2cccafcea607fd396852a7a Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 5 Nov 2021 17:48:58 +0800 Subject: [PATCH 2/5] fix tq invalid free --- source/dnode/vnode/tq/inc/tqMetaStore.h | 2 ++ source/dnode/vnode/tq/src/tqMetaStore.c | 11 +++++-- source/dnode/vnode/tq/test/tqMetaTest.cpp | 40 +++++++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 52cc767409..3d1473a443 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -92,6 +92,8 @@ int32_t tqStoreClose(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*); //int32_t TqStoreCommitAll(TqMetaStore*); int32_t tqStorePersist(TqMetaStore*); +//clean deleted idx and data from persistent file +int32_t tqStoreCompact(TqMetaStore*); void* tqHandleGet(TqMetaStore*, int64_t key); int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index eb2c3404fc..079aae7435 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -153,9 +153,9 @@ TqMetaStore* tqStoreOpen(const char* path, } else { pNode->handle.valueInUse = TQ_DELETE_TOKEN; } - serializedObj = POINTER_SHIFT(serializedObj, serializedObj->ssize); - if(serializedObj->ssize != sizeof(TqSerializedHead)) { - pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn); + TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize); + if(ptr->ssize != sizeof(TqSerializedHead)) { + pMeta->deserializer(ptr, &pNode->handle.valueInTxn); } else { pNode->handle.valueInTxn = TQ_DELETE_TOKEN; } @@ -591,3 +591,8 @@ int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) { } return -2; } + +//TODO: clean deleted idx and data from persistent file +int32_t tqStoreCompact(TqMetaStore *pMeta) { + return 0; +} diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index a1021233db..4bf56a0a56 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -57,6 +57,10 @@ TEST_F(TqMetaTest, copyPutTest) { Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); + + tqHandleCommit(pMeta, 1); + pFoo = (Foo*) tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo->a, 3); } TEST_F(TqMetaTest, persistTest) { @@ -135,6 +139,42 @@ TEST_F(TqMetaTest, deleteTest) { pMeta = tqStoreOpen(pathName, FooSerializer, FooDeserializer, FooDeleter); ASSERT(pMeta); + pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); } + +TEST_F(TqMetaTest, intxnPersist) { + Foo* pFoo = (Foo*)malloc(sizeof(Foo)); + pFoo->a = 3; + tqHandleMovePut(pMeta, 1, pFoo); + tqHandleCommit(pMeta, 1); + + Foo* pBar = (Foo*)malloc(sizeof(Foo)); + pBar->a = 4; + tqHandleMovePut(pMeta, 1, pBar); + + Foo* pFoo1 = (Foo*)tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo1->a, 3); + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + pFoo1 = (Foo*)tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo1->a, 3); + + tqHandleCommit(pMeta, 1); + + pFoo1 = (Foo*)tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo1->a, 4); + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + pFoo1 = (Foo*)tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo1->a, 4); +} From 1f1f6c5af607724e10b7ee67fde1d8d465d9f628 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 5 Nov 2021 20:18:22 +0800 Subject: [PATCH 3/5] remove tqHandleClear interface --- source/dnode/vnode/tq/inc/tqMetaStore.h | 8 ++---- source/dnode/vnode/tq/src/tqMetaStore.c | 35 ++----------------------- 2 files changed, 4 insertions(+), 39 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 3d1473a443..73a3d26aeb 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -98,15 +98,11 @@ int32_t tqStoreCompact(TqMetaStore*); void* tqHandleGet(TqMetaStore*, int64_t key); int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); -//do commit -int32_t tqHandleCommit(TqMetaStore*, int64_t key); -//delete uncommitted -int32_t tqHandleAbort(TqMetaStore*, int64_t key); //delete committed kv pair //notice that a delete action still needs to be committed int32_t tqHandleDel(TqMetaStore*, int64_t key); -//delete both committed and uncommitted -int32_t tqHandleClear(TqMetaStore*, int64_t key); +int32_t tqHandleCommit(TqMetaStore*, int64_t key); +int32_t tqHandleAbort(TqMetaStore*, int64_t key); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 079aae7435..9b91a8e051 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -400,7 +400,8 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { - if(pNode->handle.valueInUse != NULL && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { + if(pNode->handle.valueInUse != NULL + && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { return pNode->handle.valueInUse; } else { return NULL; @@ -560,38 +561,6 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { return -1; } -int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; - TqMetaList* pNode = pMeta->bucket[bucketKey]; - bool exist = false; - while(pNode) { - if(pNode->handle.key == key) { - if(pNode->handle.valueInUse != NULL) { - exist = true; - if(pNode->handle.valueInUse != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInUse); - } - pNode->handle.valueInUse = TQ_DELETE_TOKEN; - } - if(pNode->handle.valueInTxn != NULL) { - exist = true; - if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { - pMeta->deleter(pNode->handle.valueInTxn); - } - pNode->handle.valueInTxn = TQ_DELETE_TOKEN; - } - if(exist) { - tqLinkUnpersist(pMeta, pNode); - return 0; - } - return -1; - } else { - pNode = pNode->next; - } - } - return -2; -} - //TODO: clean deleted idx and data from persistent file int32_t tqStoreCompact(TqMetaStore *pMeta) { return 0; From 9dae1f317bb44b135a1fa3f92c9de9bc60f6ec66 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 8 Nov 2021 18:23:26 +0800 Subject: [PATCH 4/5] add buffer io for tq --- source/dnode/vnode/tq/inc/tqMetaStore.h | 11 ++- source/dnode/vnode/tq/src/tqMetaStore.c | 86 +++++++++++++++++------ source/dnode/vnode/tq/test/tqMetaTest.cpp | 2 - 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 73a3d26aeb..2b3ddc8765 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -27,10 +27,17 @@ extern "C" { #define TQ_BUCKET_SIZE 0xFF #define TQ_PAGE_SIZE 4096 //key + offset + size -#define TQ_IDX_ENTRY_SIZE 24 +#define TQ_IDX_SIZE 24 +//4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +//24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +//4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + inline static int TqMaxEntryOnePage() { //170 - return TQ_PAGE_SIZE / TQ_IDX_ENTRY_SIZE; + return TQ_PAGE_SIZE / TQ_IDX_SIZE; } inline static int TqEmptyTail() { //16 diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 9b91a8e051..e99d98edb3 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -35,10 +35,38 @@ static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) { } } -typedef struct TqMetaPageBuf { - int16_t offset; - char buffer[TQ_PAGE_SIZE]; -} TqMetaPageBuf; +static inline int tqSeekLastPage(int fd) { + int offset = lseek(fd, 0, SEEK_END); + int pageNo = offset / TQ_PAGE_SIZE; + int curPageOffset = pageNo * TQ_PAGE_SIZE; + return lseek(fd, curPageOffset, SEEK_SET); +} + +//TODO: the struct is tightly coupled with index entry +typedef struct TqIdxPageHead { + int16_t writeOffset; + int8_t unused[14]; +} TqIdxPageHead; + +typedef struct TqIdxPageBuf { + TqIdxPageHead head; + char buffer[TQ_IDX_PAGE_BODY_SIZE]; +} TqIdxPageBuf; + +static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { + int offset = tqSeekLastPage(fd); + int nBytes; + if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { + return -1; + } + if(nBytes == 0) { + memset(pBuf, 0, TQ_PAGE_SIZE); + pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; + } + ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset); + + return lseek(fd, offset, SEEK_SET); +} TqMetaStore* tqStoreOpen(const char* path, int serializer(const void* pObj, TqSerializedHead** ppHead), @@ -102,27 +130,31 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->deleter = deleter; //read idx file and load into memory - char idxBuf[TQ_PAGE_SIZE]; + /*char idxBuf[TQ_PAGE_SIZE];*/ + TqIdxPageBuf idxBuf; TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if(serializedObj == NULL) { //TODO:memory insufficient } int idxRead; int allocated = TQ_PAGE_SIZE; - while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) { + bool readEnd = false; + while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { if(idxRead == -1) { //TODO: handle error ASSERT(false); } + ASSERT(idxBuf.head.writeOffset == idxRead); //loop read every entry - for(int i = 0; i < idxRead; i += TQ_IDX_ENTRY_SIZE) { + for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { TqMetaList *pNode = malloc(sizeof(TqMetaList)); if(pNode == NULL) { //TODO: free memory and return error } memset(pNode, 0, sizeof(TqMetaList)); - memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE); - lseek(fileFd, pNode->handle.offset, SEEK_CUR); + memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); + + lseek(fileFd, pNode->handle.offset, SEEK_SET); if(allocated < pNode->handle.serializedSize) { void *ptr = realloc(serializedObj, pNode->handle.serializedSize); if(ptr == NULL) { @@ -263,8 +295,8 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { //TODO: wrap in tfile int32_t tqStorePersist(TqMetaStore* pMeta) { - char writeBuf[TQ_PAGE_SIZE]; - int64_t* bufPtr = (int64_t*)writeBuf; + TqIdxPageBuf idxBuf; + int64_t* bufPtr = (int64_t*)idxBuf.buffer; TqMetaList *pHead = pMeta->unpersistHead; TqMetaList *pNode = pHead->unpersistNext; TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); @@ -277,6 +309,17 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { pSHead->ssize = sizeof(TqSerializedHead); int allocatedSize = sizeof(TqSerializedHead); int offset = lseek(pMeta->fileFd, 0, SEEK_CUR); + + tqReadLastPage(pMeta->idxFd, &idxBuf); + + if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) { + lseek(pMeta->idxFd, 0, SEEK_END); + memset(&idxBuf, 0, TQ_PAGE_SIZE); + idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; + } else { + bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset); + } + while(pHead != pNode) { int nBytes = 0; @@ -307,18 +350,21 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ASSERT(nBytesTxn == pSHead->ssize); nBytes += nBytesTxn; } + pNode->handle.offset = offset; + offset += nBytes; //write idx file //TODO: endian check and convert *(bufPtr++) = pNode->handle.key; *(bufPtr++) = pNode->handle.offset; *(bufPtr++) = (int64_t)nBytes; - if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) { - nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf)); + idxBuf.head.writeOffset += TQ_IDX_SIZE; + if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { + nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE); //TODO: handle error with tfile - ASSERT(nBytes == sizeof(writeBuf)); - memset(writeBuf, 0, TQ_PAGE_SIZE); - bufPtr = (int64_t*)writeBuf; + ASSERT(nBytes == TQ_PAGE_SIZE); + memset(&idxBuf, 0, TQ_PAGE_SIZE); + bufPtr = (int64_t*)&idxBuf.buffer; } //remove from unpersist list pHead->unpersistNext = pNode->unpersistNext; @@ -350,11 +396,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { //write left bytes free(pSHead); - if((char*)bufPtr != writeBuf) { - int used = (char*)bufPtr - writeBuf; - int nBytes = write(pMeta->idxFd, writeBuf, used); + //TODO: write new version in tfile + if((char*)bufPtr != idxBuf.buffer) { + int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset); //TODO: handle error in tfile - ASSERT(nBytes == used); + ASSERT(nBytes == idxBuf.head.writeOffset); } //TODO: using fsync in tfile fsync(pMeta->idxFd); diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index 4bf56a0a56..d0511c2e2c 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -86,8 +86,6 @@ TEST_F(TqMetaTest, persistTest) { pBar = (Foo*)tqHandleGet(pMeta, 2); EXPECT_EQ(pBar == NULL, true); - - //taosRemoveDir(pathName); } TEST_F(TqMetaTest, uncommittedTest) { From 34581781e6f734530bbb4541e2382b52da318c4d Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 9 Nov 2021 15:16:10 +0800 Subject: [PATCH 5/5] fix memory leak --- source/dnode/vnode/tq/inc/tqMetaStore.h | 13 +-- source/dnode/vnode/tq/src/tqMetaStore.c | 39 +++++--- source/dnode/vnode/tq/test/tqMetaTest.cpp | 116 ++++++++++++++++++++++ 3 files changed, 142 insertions(+), 26 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 2b3ddc8765..c62e7db111 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -24,7 +24,9 @@ extern "C" { #endif -#define TQ_BUCKET_SIZE 0xFF +#define TQ_BUCKET_MASK 0xFF +#define TQ_BUCKET_SIZE 256 + #define TQ_PAGE_SIZE 4096 //key + offset + size #define TQ_IDX_SIZE 24 @@ -35,15 +37,6 @@ extern "C" { //4096 - 4080 #define TQ_IDX_PAGE_HEAD_SIZE 16 - -inline static int TqMaxEntryOnePage() { //170 - return TQ_PAGE_SIZE / TQ_IDX_SIZE; -} - -inline static int TqEmptyTail() { //16 - return TQ_PAGE_SIZE - TqMaxEntryOnePage(); -} - #define TQ_ACTION_CONST 0 #define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE_CONT 2 diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index e99d98edb3..8e1f78fbe0 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -130,7 +130,6 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->deleter = deleter; //read idx file and load into memory - /*char idxBuf[TQ_PAGE_SIZE];*/ TqIdxPageBuf idxBuf; TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if(serializedObj == NULL) { @@ -196,7 +195,7 @@ TqMetaStore* tqStoreOpen(const char* path, } //put into list - int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; + int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; TqMetaList* pBucketNode = pMeta->bucket[bucketKey]; if(pBucketNode == NULL) { pMeta->bucket[bucketKey] = pNode; @@ -205,15 +204,18 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->bucket[bucketKey] = pNode; } else { while(pBucketNode->next && - pBucketNode->next->handle.key == pNode->handle.key) { + pBucketNode->next->handle.key != pNode->handle.key) { pBucketNode = pBucketNode->next; } if(pBucketNode->next) { ASSERT(pBucketNode->next->handle.key == pNode->handle.key); - TqMetaList *pNodeTmp = pBucketNode->next; - pBucketNode->next = pNodeTmp->next; - pBucketNode = pNodeTmp; + TqMetaList *pNodeFound = pBucketNode->next; + pNode->next = pNodeFound->next; + pBucketNode->next = pNode; + pBucketNode = pNodeFound; } else { + pNode->next = pMeta->bucket[bucketKey]; + pMeta->bucket[bucketKey] = pNode; pBucketNode = NULL; } } @@ -359,11 +361,13 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { *(bufPtr++) = pNode->handle.offset; *(bufPtr++) = (int64_t)nBytes; idxBuf.head.writeOffset += TQ_IDX_SIZE; + if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE); //TODO: handle error with tfile ASSERT(nBytes == TQ_PAGE_SIZE); memset(&idxBuf, 0, TQ_PAGE_SIZE); + idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; bufPtr = (int64_t*)&idxBuf.buffer; } //remove from unpersist list @@ -376,7 +380,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { if(pNode->handle.valueInUse == TQ_DELETE_TOKEN && pNode->handle.valueInTxn == NULL ) { - int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; + int bucketKey = pNode->handle.key & TQ_BUCKET_MASK; TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; if(pBucketHead == pNode) { pMeta->bucket[bucketKey] = pNode->next; @@ -409,7 +413,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { } static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -442,7 +446,7 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value } void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -460,7 +464,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -498,7 +502,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi return -1; } memcpy(vmem, value, vsize); - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -530,7 +534,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi } static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -548,10 +552,13 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { + if(pNode->handle.valueInTxn == NULL) { + return -1; + } if(pNode->handle.valueInUse && pNode->handle.valueInUse != TQ_DELETE_TOKEN) { pMeta->deleter(pNode->handle.valueInUse); @@ -564,11 +571,11 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { pNode = pNode->next; } } - return -1; + return -2; } int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.key == key) { @@ -589,7 +596,7 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { } int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { - int64_t bucketKey = key & TQ_BUCKET_SIZE; + int64_t bucketKey = key & TQ_BUCKET_MASK; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index d0511c2e2c..f0241257b8 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -176,3 +176,119 @@ TEST_F(TqMetaTest, intxnPersist) { pFoo1 = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo1->a, 4); } + +TEST_F(TqMetaTest, multiplePage) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + for(int i = 0; i < 500; i++) { + tqHandleCommit(pMeta, i); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + for(int i = 500; i < 1000; i++) { + tqHandleCommit(pMeta, i); + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +} + +TEST_F(TqMetaTest, multipleRewrite) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 500; i++) { + tqHandleCommit(pMeta, i); + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 500; i < 1000; i++) { + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 1000; i++) { + tqHandleCommit(pMeta, i); + } + + tqStoreClose(pMeta); + pMeta = tqStoreOpen(pathName, + FooSerializer, FooDeserializer, FooDeleter); + ASSERT(pMeta); + + for(int i = 500; i < 1000; i++) { + v[i] = rand(); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + tqHandleCommit(pMeta, i); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +} + +TEST_F(TqMetaTest, dupCommit) { + srand(0); + std::vector v; + for(int i = 0; i < 1000; i++) { + v.push_back(rand()); + Foo foo; + foo.a = v[i]; + tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo)); + } + + for(int i = 0; i < 1000; i++) { + int ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, 0); + ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, -1); + } + + for(int i = 0; i < 1000; i++) { + int ret = tqHandleCommit(pMeta, i); + EXPECT_EQ(ret, -1); + } + + for(int i = 0; i < 1000; i++) { + Foo* pFoo = (Foo*)tqHandleGet(pMeta, i); + ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n"; + EXPECT_EQ(pFoo->a, v[i]); + } + +}