From 9dae1f317bb44b135a1fa3f92c9de9bc60f6ec66 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 8 Nov 2021 18:23:26 +0800 Subject: [PATCH] add buffer io for tq --- source/dnode/vnode/tq/inc/tqMetaStore.h | 11 ++- source/dnode/vnode/tq/src/tqMetaStore.c | 86 +++++++++++++++++------ source/dnode/vnode/tq/test/tqMetaTest.cpp | 2 - 3 files changed, 75 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/tq/inc/tqMetaStore.h b/source/dnode/vnode/tq/inc/tqMetaStore.h index 73a3d26aeb..2b3ddc8765 100644 --- a/source/dnode/vnode/tq/inc/tqMetaStore.h +++ b/source/dnode/vnode/tq/inc/tqMetaStore.h @@ -27,10 +27,17 @@ extern "C" { #define TQ_BUCKET_SIZE 0xFF #define TQ_PAGE_SIZE 4096 //key + offset + size -#define TQ_IDX_ENTRY_SIZE 24 +#define TQ_IDX_SIZE 24 +//4096 / 24 +#define TQ_MAX_IDX_ONE_PAGE 170 +//24 * 170 +#define TQ_IDX_PAGE_BODY_SIZE 4080 +//4096 - 4080 +#define TQ_IDX_PAGE_HEAD_SIZE 16 + inline static int TqMaxEntryOnePage() { //170 - return TQ_PAGE_SIZE / TQ_IDX_ENTRY_SIZE; + return TQ_PAGE_SIZE / TQ_IDX_SIZE; } inline static int TqEmptyTail() { //16 diff --git a/source/dnode/vnode/tq/src/tqMetaStore.c b/source/dnode/vnode/tq/src/tqMetaStore.c index 9b91a8e051..e99d98edb3 100644 --- a/source/dnode/vnode/tq/src/tqMetaStore.c +++ b/source/dnode/vnode/tq/src/tqMetaStore.c @@ -35,10 +35,38 @@ static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) { } } -typedef struct TqMetaPageBuf { - int16_t offset; - char buffer[TQ_PAGE_SIZE]; -} TqMetaPageBuf; +static inline int tqSeekLastPage(int fd) { + int offset = lseek(fd, 0, SEEK_END); + int pageNo = offset / TQ_PAGE_SIZE; + int curPageOffset = pageNo * TQ_PAGE_SIZE; + return lseek(fd, curPageOffset, SEEK_SET); +} + +//TODO: the struct is tightly coupled with index entry +typedef struct TqIdxPageHead { + int16_t writeOffset; + int8_t unused[14]; +} TqIdxPageHead; + +typedef struct TqIdxPageBuf { + TqIdxPageHead head; + char buffer[TQ_IDX_PAGE_BODY_SIZE]; +} TqIdxPageBuf; + +static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) { + int offset = tqSeekLastPage(fd); + int nBytes; + if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) { + return -1; + } + if(nBytes == 0) { + memset(pBuf, 0, TQ_PAGE_SIZE); + pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; + } + ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset); + + return lseek(fd, offset, SEEK_SET); +} TqMetaStore* tqStoreOpen(const char* path, int serializer(const void* pObj, TqSerializedHead** ppHead), @@ -102,27 +130,31 @@ TqMetaStore* tqStoreOpen(const char* path, pMeta->deleter = deleter; //read idx file and load into memory - char idxBuf[TQ_PAGE_SIZE]; + /*char idxBuf[TQ_PAGE_SIZE];*/ + TqIdxPageBuf idxBuf; TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); if(serializedObj == NULL) { //TODO:memory insufficient } int idxRead; int allocated = TQ_PAGE_SIZE; - while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) { + bool readEnd = false; + while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) { if(idxRead == -1) { //TODO: handle error ASSERT(false); } + ASSERT(idxBuf.head.writeOffset == idxRead); //loop read every entry - for(int i = 0; i < idxRead; i += TQ_IDX_ENTRY_SIZE) { + for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) { TqMetaList *pNode = malloc(sizeof(TqMetaList)); if(pNode == NULL) { //TODO: free memory and return error } memset(pNode, 0, sizeof(TqMetaList)); - memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE); - lseek(fileFd, pNode->handle.offset, SEEK_CUR); + memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE); + + lseek(fileFd, pNode->handle.offset, SEEK_SET); if(allocated < pNode->handle.serializedSize) { void *ptr = realloc(serializedObj, pNode->handle.serializedSize); if(ptr == NULL) { @@ -263,8 +295,8 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) { //TODO: wrap in tfile int32_t tqStorePersist(TqMetaStore* pMeta) { - char writeBuf[TQ_PAGE_SIZE]; - int64_t* bufPtr = (int64_t*)writeBuf; + TqIdxPageBuf idxBuf; + int64_t* bufPtr = (int64_t*)idxBuf.buffer; TqMetaList *pHead = pMeta->unpersistHead; TqMetaList *pNode = pHead->unpersistNext; TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); @@ -277,6 +309,17 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { pSHead->ssize = sizeof(TqSerializedHead); int allocatedSize = sizeof(TqSerializedHead); int offset = lseek(pMeta->fileFd, 0, SEEK_CUR); + + tqReadLastPage(pMeta->idxFd, &idxBuf); + + if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) { + lseek(pMeta->idxFd, 0, SEEK_END); + memset(&idxBuf, 0, TQ_PAGE_SIZE); + idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE; + } else { + bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset); + } + while(pHead != pNode) { int nBytes = 0; @@ -307,18 +350,21 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { ASSERT(nBytesTxn == pSHead->ssize); nBytes += nBytesTxn; } + pNode->handle.offset = offset; + offset += nBytes; //write idx file //TODO: endian check and convert *(bufPtr++) = pNode->handle.key; *(bufPtr++) = pNode->handle.offset; *(bufPtr++) = (int64_t)nBytes; - if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) { - nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf)); + idxBuf.head.writeOffset += TQ_IDX_SIZE; + if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) { + nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE); //TODO: handle error with tfile - ASSERT(nBytes == sizeof(writeBuf)); - memset(writeBuf, 0, TQ_PAGE_SIZE); - bufPtr = (int64_t*)writeBuf; + ASSERT(nBytes == TQ_PAGE_SIZE); + memset(&idxBuf, 0, TQ_PAGE_SIZE); + bufPtr = (int64_t*)&idxBuf.buffer; } //remove from unpersist list pHead->unpersistNext = pNode->unpersistNext; @@ -350,11 +396,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) { //write left bytes free(pSHead); - if((char*)bufPtr != writeBuf) { - int used = (char*)bufPtr - writeBuf; - int nBytes = write(pMeta->idxFd, writeBuf, used); + //TODO: write new version in tfile + if((char*)bufPtr != idxBuf.buffer) { + int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset); //TODO: handle error in tfile - ASSERT(nBytes == used); + ASSERT(nBytes == idxBuf.head.writeOffset); } //TODO: using fsync in tfile fsync(pMeta->idxFd); diff --git a/source/dnode/vnode/tq/test/tqMetaTest.cpp b/source/dnode/vnode/tq/test/tqMetaTest.cpp index 4bf56a0a56..d0511c2e2c 100644 --- a/source/dnode/vnode/tq/test/tqMetaTest.cpp +++ b/source/dnode/vnode/tq/test/tqMetaTest.cpp @@ -86,8 +86,6 @@ TEST_F(TqMetaTest, persistTest) { pBar = (Foo*)tqHandleGet(pMeta, 2); EXPECT_EQ(pBar == NULL, true); - - //taosRemoveDir(pathName); } TEST_F(TqMetaTest, uncommittedTest) {