From f5992851891ea30a951e0c68e04bcb048bd28158 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 4 Nov 2021 10:16:59 +0800 Subject: [PATCH] add tqHandleCopyPut interface --- source/dnode/vnode/tq/inc/tqMetaStore.h | 4 +- source/dnode/vnode/tq/src/tqMetaStore.c | 67 ++++++++++++++++++++++- source/dnode/vnode/tq/test/tqMetaTest.cpp | 15 ++++- 3 files changed, 79 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index f3918d0a8e..f8700e065f 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -60,6 +60,7 @@ typedef struct TqMetaStore { TqMetaList* unpersistHead; int fileFd; //TODO:temporaral use, to be replaced by unified tfile int idxFd; //TODO:temporaral use, to be replaced by unified tfile + char* dirPath; int (*serializer)(const void* pObj, void** ppBytes); const void* (*deserializer)(const void* pBytes, void** ppObj); void (*deleter)(void*); @@ -75,7 +76,8 @@ int32_t tqStoreClose(TqMetaStore*); int32_t tqStorePersist(TqMetaStore*); void* tqHandleGet(TqMetaStore*, int64_t key); -int32_t tqHandlePut(TqMetaStore*, int64_t key, void* value); +int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); +int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); //do commit int32_t tqHandleCommit(TqMetaStore*, int64_t key); //delete uncommitted diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index f850f73e12..5dac02ff67 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -44,6 +44,12 @@ TqMetaStore* tqStoreOpen(const char* path, //concat data file name and index file name size_t pathLen = strlen(path); + pMeta->dirPath = malloc(pathLen+1); + if(pMeta->dirPath != NULL) { + //TODO: memory insufficient + } + strcpy(pMeta->dirPath, path); + char name[pathLen+10]; strcpy(name, path); @@ -155,15 +161,35 @@ int32_t tqStoreClose(TqMetaStore* pMeta) { pNode = next; } } + free(pMeta->dirPath); free(pMeta->unpersistHead); free(pMeta); return 0; } int32_t tqStoreDelete(TqMetaStore* pMeta) { - //close file - //delete file + close(pMeta->fileFd); + close(pMeta->idxFd); //free memory + for(int i = 0; i < TQ_BUCKET_SIZE; i++) { + TqMetaList* pNode = pMeta->bucket[i]; + pMeta->bucket[i] = NULL; + while(pNode) { + if(pNode->handle.valueInTxn) { + pMeta->deleter(pNode->handle.valueInTxn); + } + if(pNode->handle.valueInUse) { + pMeta->deleter(pNode->handle.valueInUse); + } + TqMetaList* next = pNode->next; + free(pNode); + pNode = next; + } + } + free(pMeta->unpersistHead); + taosRemoveDir(pMeta->dirPath); + free(pMeta->dirPath); + free(pMeta); return 0; } @@ -301,7 +327,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { return NULL; } -int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) { +int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { int64_t bucketKey = key & TQ_BUCKET_SIZE; TqMetaList* pNode = pMeta->bucket[bucketKey]; while(pNode) { @@ -330,6 +356,41 @@ int32_t tqHandlePut(TqMetaStore* pMeta, int64_t key, void* value) { return 0; } +int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsize) { + void *vmem = malloc(vsize); + if(vmem == NULL) { + //TODO: memory error + return -1; + } + memcpy(vmem, value, vsize); + int64_t bucketKey = key & TQ_BUCKET_SIZE; + TqMetaList* pNode = pMeta->bucket[bucketKey]; + while(pNode) { + if(pNode->handle.key == key) { + //TODO: think about thread safety + if(pNode->handle.valueInTxn) { + pMeta->deleter(pNode->handle.valueInTxn); + } + //change pointer ownership + pNode->handle.valueInTxn = vmem; + return 0; + } else { + pNode = pNode->next; + } + } + TqMetaList *pNewNode = malloc(sizeof(TqMetaList)); + if(pNewNode == NULL) { + //TODO: memory error + return -1; + } + memset(pNewNode, 0, sizeof(TqMetaList)); + pNewNode->handle.key = key; + pNewNode->handle.valueInTxn = vmem; + pNewNode->next = pMeta->bucket[bucketKey]; + pMeta->bucket[bucketKey] = pNewNode; + return 0; +} + static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { int64_t bucketKey = key & TQ_BUCKET_SIZE; TqMetaList* pNode = pMeta->bucket[bucketKey]; diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index e993003a78..8afb10e824 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -47,10 +47,19 @@ class TqMetaTest : public ::testing::Test { const char* pathName = "/tmp/tq_test"; }; +TEST_F(TqMetaTest, copyPutTest) { + Foo foo; + foo.a = 3; + tqHandleCopyPut(pMeta, 1, &foo, sizeof(Foo)); + + Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); + EXPECT_EQ(pFoo == NULL, true); +} + TEST_F(TqMetaTest, persistTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 2; - tqHandlePut(pMeta, 1, pFoo); + tqHandleMovePut(pMeta, 1, pFoo); Foo* pBar = (Foo*)tqHandleGet(pMeta, 1); EXPECT_EQ(pBar == NULL, true); tqHandleCommit(pMeta, 1); @@ -77,7 +86,7 @@ TEST_F(TqMetaTest, persistTest) { TEST_F(TqMetaTest, uncommittedTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; - tqHandlePut(pMeta, 1, pFoo); + tqHandleMovePut(pMeta, 1, pFoo); pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true); @@ -86,7 +95,7 @@ TEST_F(TqMetaTest, uncommittedTest) { TEST_F(TqMetaTest, abortTest) { Foo* pFoo = (Foo*)malloc(sizeof(Foo)); pFoo->a = 3; - tqHandlePut(pMeta, 1, pFoo); + tqHandleMovePut(pMeta, 1, pFoo); pFoo = (Foo*) tqHandleGet(pMeta, 1); EXPECT_EQ(pFoo == NULL, true);