add buffer io for tq

This commit is contained in:
Liu Jicong 2021-11-08 18:23:26 +08:00
parent 1f1f6c5af6
commit 9dae1f317b
3 changed files with 75 additions and 24 deletions

View File

@ -27,10 +27,17 @@ extern "C" {
#define TQ_BUCKET_SIZE 0xFF
#define TQ_PAGE_SIZE 4096
//key + offset + size
#define TQ_IDX_ENTRY_SIZE 24
#define TQ_IDX_SIZE 24
//4096 / 24
#define TQ_MAX_IDX_ONE_PAGE 170
//24 * 170
#define TQ_IDX_PAGE_BODY_SIZE 4080
//4096 - 4080
#define TQ_IDX_PAGE_HEAD_SIZE 16
inline static int TqMaxEntryOnePage() { //170
return TQ_PAGE_SIZE / TQ_IDX_ENTRY_SIZE;
return TQ_PAGE_SIZE / TQ_IDX_SIZE;
}
inline static int TqEmptyTail() { //16

View File

@ -35,10 +35,38 @@ static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) {
}
}
typedef struct TqMetaPageBuf {
int16_t offset;
char buffer[TQ_PAGE_SIZE];
} TqMetaPageBuf;
static inline int tqSeekLastPage(int fd) {
int offset = lseek(fd, 0, SEEK_END);
int pageNo = offset / TQ_PAGE_SIZE;
int curPageOffset = pageNo * TQ_PAGE_SIZE;
return lseek(fd, curPageOffset, SEEK_SET);
}
//TODO: the struct is tightly coupled with index entry
typedef struct TqIdxPageHead {
int16_t writeOffset;
int8_t unused[14];
} TqIdxPageHead;
typedef struct TqIdxPageBuf {
TqIdxPageHead head;
char buffer[TQ_IDX_PAGE_BODY_SIZE];
} TqIdxPageBuf;
static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd);
int nBytes;
if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
return -1;
}
if(nBytes == 0) {
memset(pBuf, 0, TQ_PAGE_SIZE);
pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
}
ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset);
return lseek(fd, offset, SEEK_SET);
}
TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead),
@ -102,27 +130,31 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->deleter = deleter;
//read idx file and load into memory
char idxBuf[TQ_PAGE_SIZE];
/*char idxBuf[TQ_PAGE_SIZE];*/
TqIdxPageBuf idxBuf;
TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if(serializedObj == NULL) {
//TODO:memory insufficient
}
int idxRead;
int allocated = TQ_PAGE_SIZE;
while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) {
bool readEnd = false;
while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
if(idxRead == -1) {
//TODO: handle error
ASSERT(false);
}
ASSERT(idxBuf.head.writeOffset == idxRead);
//loop read every entry
for(int i = 0; i < idxRead; i += TQ_IDX_ENTRY_SIZE) {
for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
TqMetaList *pNode = malloc(sizeof(TqMetaList));
if(pNode == NULL) {
//TODO: free memory and return error
}
memset(pNode, 0, sizeof(TqMetaList));
memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_CUR);
memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_SET);
if(allocated < pNode->handle.serializedSize) {
void *ptr = realloc(serializedObj, pNode->handle.serializedSize);
if(ptr == NULL) {
@ -263,8 +295,8 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
//TODO: wrap in tfile
int32_t tqStorePersist(TqMetaStore* pMeta) {
char writeBuf[TQ_PAGE_SIZE];
int64_t* bufPtr = (int64_t*)writeBuf;
TqIdxPageBuf idxBuf;
int64_t* bufPtr = (int64_t*)idxBuf.buffer;
TqMetaList *pHead = pMeta->unpersistHead;
TqMetaList *pNode = pHead->unpersistNext;
TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
@ -277,6 +309,17 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
pSHead->ssize = sizeof(TqSerializedHead);
int allocatedSize = sizeof(TqSerializedHead);
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf);
if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
lseek(pMeta->idxFd, 0, SEEK_END);
memset(&idxBuf, 0, TQ_PAGE_SIZE);
idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
} else {
bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
}
while(pHead != pNode) {
int nBytes = 0;
@ -307,18 +350,21 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
ASSERT(nBytesTxn == pSHead->ssize);
nBytes += nBytesTxn;
}
pNode->handle.offset = offset;
offset += nBytes;
//write idx file
//TODO: endian check and convert
*(bufPtr++) = pNode->handle.key;
*(bufPtr++) = pNode->handle.offset;
*(bufPtr++) = (int64_t)nBytes;
if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf));
idxBuf.head.writeOffset += TQ_IDX_SIZE;
if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE);
//TODO: handle error with tfile
ASSERT(nBytes == sizeof(writeBuf));
memset(writeBuf, 0, TQ_PAGE_SIZE);
bufPtr = (int64_t*)writeBuf;
ASSERT(nBytes == TQ_PAGE_SIZE);
memset(&idxBuf, 0, TQ_PAGE_SIZE);
bufPtr = (int64_t*)&idxBuf.buffer;
}
//remove from unpersist list
pHead->unpersistNext = pNode->unpersistNext;
@ -350,11 +396,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
//write left bytes
free(pSHead);
if((char*)bufPtr != writeBuf) {
int used = (char*)bufPtr - writeBuf;
int nBytes = write(pMeta->idxFd, writeBuf, used);
//TODO: write new version in tfile
if((char*)bufPtr != idxBuf.buffer) {
int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset);
//TODO: handle error in tfile
ASSERT(nBytes == used);
ASSERT(nBytes == idxBuf.head.writeOffset);
}
//TODO: using fsync in tfile
fsync(pMeta->idxFd);

View File

@ -86,8 +86,6 @@ TEST_F(TqMetaTest, persistTest) {
pBar = (Foo*)tqHandleGet(pMeta, 2);
EXPECT_EQ(pBar == NULL, true);
//taosRemoveDir(pathName);
}
TEST_F(TqMetaTest, uncommittedTest) {