Merge pull request #8624 from taosdata/feature/tq

tq meta management
This commit is contained in:
Liu Jicong 2021-11-09 16:16:49 +08:00 committed by GitHub
commit 771310d322
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 285 additions and 96 deletions

View File

@ -24,18 +24,18 @@
extern "C" { extern "C" {
#endif #endif
#define TQ_BUCKET_SIZE 0xFF #define TQ_BUCKET_MASK 0xFF
#define TQ_BUCKET_SIZE 256
#define TQ_PAGE_SIZE 4096 #define TQ_PAGE_SIZE 4096
//key + offset + size //key + offset + size
#define TQ_IDX_ENTRY_SIZE 24 #define TQ_IDX_SIZE 24
//4096 / 24
inline static int TqMaxEntryOnePage() { //170 #define TQ_MAX_IDX_ONE_PAGE 170
return TQ_PAGE_SIZE / TQ_IDX_ENTRY_SIZE; //24 * 170
} #define TQ_IDX_PAGE_BODY_SIZE 4080
//4096 - 4080
inline static int TqEmptyTail() { //16 #define TQ_IDX_PAGE_HEAD_SIZE 16
return TQ_PAGE_SIZE - TqMaxEntryOnePage();
}
#define TQ_ACTION_CONST 0 #define TQ_ACTION_CONST 0
#define TQ_ACTION_INUSE 1 #define TQ_ACTION_INUSE 1
@ -92,19 +92,17 @@ int32_t tqStoreClose(TqMetaStore*);
//int32_t tqStoreDelete(TqMetaStore*); //int32_t tqStoreDelete(TqMetaStore*);
//int32_t TqStoreCommitAll(TqMetaStore*); //int32_t TqStoreCommitAll(TqMetaStore*);
int32_t tqStorePersist(TqMetaStore*); int32_t tqStorePersist(TqMetaStore*);
//clean deleted idx and data from persistent file
int32_t tqStoreCompact(TqMetaStore*);
void* tqHandleGet(TqMetaStore*, int64_t key); void* tqHandleGet(TqMetaStore*, int64_t key);
int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value); int32_t tqHandleMovePut(TqMetaStore*, int64_t key, void* value);
int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize); int32_t tqHandleCopyPut(TqMetaStore*, int64_t key, void* value, size_t vsize);
//do commit
int32_t tqHandleCommit(TqMetaStore*, int64_t key);
//delete uncommitted
int32_t tqHandleAbort(TqMetaStore*, int64_t key);
//delete committed kv pair //delete committed kv pair
//notice that a delete action still needs to be committed //notice that a delete action still needs to be committed
int32_t tqHandleDel(TqMetaStore*, int64_t key); int32_t tqHandleDel(TqMetaStore*, int64_t key);
//delete both committed and uncommitted int32_t tqHandleCommit(TqMetaStore*, int64_t key);
int32_t tqHandleClear(TqMetaStore*, int64_t key); int32_t tqHandleAbort(TqMetaStore*, int64_t key);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -22,6 +22,9 @@
// //
//handle management message //handle management message
// //
int tqGetgHandleSSize(const TqGroupHandle *gHandle);
static int tqProtoCheck(TmqMsgHead *pMsg) { static int tqProtoCheck(TmqMsgHead *pMsg) {
return pMsg->protoVer == 0; return pMsg->protoVer == 0;
} }

View File

@ -27,19 +27,46 @@ static int32_t tqHandlePutCommitted(TqMetaStore*, int64_t key, void* value);
static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key); static void* tqHandleGetUncommitted(TqMetaStore*, int64_t key);
static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) { static inline void tqLinkUnpersist(TqMetaStore *pMeta, TqMetaList* pNode) {
if(pNode->unpersistNext == NULL) { if(pNode->unpersistNext == NULL) {
pNode->unpersistNext = pMeta->unpersistHead->unpersistNext; pNode->unpersistNext = pMeta->unpersistHead->unpersistNext;
pNode->unpersistPrev = pMeta->unpersistHead; pNode->unpersistPrev = pMeta->unpersistHead;
pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode; pMeta->unpersistHead->unpersistNext->unpersistPrev = pNode;
pMeta->unpersistHead->unpersistNext = pNode; pMeta->unpersistHead->unpersistNext = pNode;
} }
} }
static inline int tqSeekLastPage(int fd) {
int offset = lseek(fd, 0, SEEK_END);
int pageNo = offset / TQ_PAGE_SIZE;
int curPageOffset = pageNo * TQ_PAGE_SIZE;
return lseek(fd, curPageOffset, SEEK_SET);
}
typedef struct TqMetaPageBuf { //TODO: the struct is tightly coupled with index entry
int16_t offset; typedef struct TqIdxPageHead {
char buffer[TQ_PAGE_SIZE]; int16_t writeOffset;
} TqMetaPageBuf; int8_t unused[14];
} TqIdxPageHead;
typedef struct TqIdxPageBuf {
TqIdxPageHead head;
char buffer[TQ_IDX_PAGE_BODY_SIZE];
} TqIdxPageBuf;
static inline int tqReadLastPage(int fd, TqIdxPageBuf* pBuf) {
int offset = tqSeekLastPage(fd);
int nBytes;
if((nBytes = read(fd, pBuf, TQ_PAGE_SIZE)) == -1) {
return -1;
}
if(nBytes == 0) {
memset(pBuf, 0, TQ_PAGE_SIZE);
pBuf->head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
}
ASSERT(nBytes == 0 || nBytes == pBuf->head.writeOffset);
return lseek(fd, offset, SEEK_SET);
}
TqMetaStore* tqStoreOpen(const char* path, TqMetaStore* tqStoreOpen(const char* path,
int serializer(const void* pObj, TqSerializedHead** ppHead), int serializer(const void* pObj, TqSerializedHead** ppHead),
@ -103,27 +130,30 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->deleter = deleter; pMeta->deleter = deleter;
//read idx file and load into memory //read idx file and load into memory
char idxBuf[TQ_PAGE_SIZE]; TqIdxPageBuf idxBuf;
TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE); TqSerializedHead* serializedObj = malloc(TQ_PAGE_SIZE);
if(serializedObj == NULL) { if(serializedObj == NULL) {
//TODO:memory insufficient //TODO:memory insufficient
} }
int idxRead; int idxRead;
int allocated = TQ_PAGE_SIZE; int allocated = TQ_PAGE_SIZE;
while((idxRead = read(idxFd, idxBuf, TQ_PAGE_SIZE))) { bool readEnd = false;
while((idxRead = read(idxFd, &idxBuf, TQ_PAGE_SIZE))) {
if(idxRead == -1) { if(idxRead == -1) {
//TODO: handle error //TODO: handle error
ASSERT(false); ASSERT(false);
} }
ASSERT(idxBuf.head.writeOffset == idxRead);
//loop read every entry //loop read every entry
for(int i = 0; i < idxRead; i += TQ_IDX_ENTRY_SIZE) { for(int i = 0; i < idxBuf.head.writeOffset - TQ_IDX_PAGE_HEAD_SIZE; i += TQ_IDX_SIZE) {
TqMetaList *pNode = malloc(sizeof(TqMetaList)); TqMetaList *pNode = malloc(sizeof(TqMetaList));
if(pNode == NULL) { if(pNode == NULL) {
//TODO: free memory and return error //TODO: free memory and return error
} }
memset(pNode, 0, sizeof(TqMetaList)); memset(pNode, 0, sizeof(TqMetaList));
memcpy(&pNode->handle, &idxBuf[i], TQ_IDX_ENTRY_SIZE); memcpy(&pNode->handle, &idxBuf.buffer[i], TQ_IDX_SIZE);
lseek(fileFd, pNode->handle.offset, SEEK_CUR);
lseek(fileFd, pNode->handle.offset, SEEK_SET);
if(allocated < pNode->handle.serializedSize) { if(allocated < pNode->handle.serializedSize) {
void *ptr = realloc(serializedObj, pNode->handle.serializedSize); void *ptr = realloc(serializedObj, pNode->handle.serializedSize);
if(ptr == NULL) { if(ptr == NULL) {
@ -154,9 +184,9 @@ TqMetaStore* tqStoreOpen(const char* path,
} else { } else {
pNode->handle.valueInUse = TQ_DELETE_TOKEN; pNode->handle.valueInUse = TQ_DELETE_TOKEN;
} }
serializedObj = POINTER_SHIFT(serializedObj, serializedObj->ssize); TqSerializedHead* ptr = POINTER_SHIFT(serializedObj, serializedObj->ssize);
if(serializedObj->ssize != sizeof(TqSerializedHead)) { if(ptr->ssize != sizeof(TqSerializedHead)) {
pMeta->deserializer(serializedObj, &pNode->handle.valueInTxn); pMeta->deserializer(ptr, &pNode->handle.valueInTxn);
} else { } else {
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
} }
@ -165,7 +195,7 @@ TqMetaStore* tqStoreOpen(const char* path,
} }
//put into list //put into list
int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
TqMetaList* pBucketNode = pMeta->bucket[bucketKey]; TqMetaList* pBucketNode = pMeta->bucket[bucketKey];
if(pBucketNode == NULL) { if(pBucketNode == NULL) {
pMeta->bucket[bucketKey] = pNode; pMeta->bucket[bucketKey] = pNode;
@ -174,15 +204,18 @@ TqMetaStore* tqStoreOpen(const char* path,
pMeta->bucket[bucketKey] = pNode; pMeta->bucket[bucketKey] = pNode;
} else { } else {
while(pBucketNode->next && while(pBucketNode->next &&
pBucketNode->next->handle.key == pNode->handle.key) { pBucketNode->next->handle.key != pNode->handle.key) {
pBucketNode = pBucketNode->next; pBucketNode = pBucketNode->next;
} }
if(pBucketNode->next) { if(pBucketNode->next) {
ASSERT(pBucketNode->next->handle.key == pNode->handle.key); ASSERT(pBucketNode->next->handle.key == pNode->handle.key);
TqMetaList *pNodeTmp = pBucketNode->next; TqMetaList *pNodeFound = pBucketNode->next;
pBucketNode->next = pNodeTmp->next; pNode->next = pNodeFound->next;
pBucketNode = pNodeTmp; pBucketNode->next = pNode;
pBucketNode = pNodeFound;
} else { } else {
pNode->next = pMeta->bucket[bucketKey];
pMeta->bucket[bucketKey] = pNode;
pBucketNode = NULL; pBucketNode = NULL;
} }
} }
@ -264,8 +297,8 @@ int32_t tqStoreDelete(TqMetaStore* pMeta) {
//TODO: wrap in tfile //TODO: wrap in tfile
int32_t tqStorePersist(TqMetaStore* pMeta) { int32_t tqStorePersist(TqMetaStore* pMeta) {
char writeBuf[TQ_PAGE_SIZE]; TqIdxPageBuf idxBuf;
int64_t* bufPtr = (int64_t*)writeBuf; int64_t* bufPtr = (int64_t*)idxBuf.buffer;
TqMetaList *pHead = pMeta->unpersistHead; TqMetaList *pHead = pMeta->unpersistHead;
TqMetaList *pNode = pHead->unpersistNext; TqMetaList *pNode = pHead->unpersistNext;
TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead)); TqSerializedHead *pSHead = malloc(sizeof(TqSerializedHead));
@ -278,6 +311,17 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
pSHead->ssize = sizeof(TqSerializedHead); pSHead->ssize = sizeof(TqSerializedHead);
int allocatedSize = sizeof(TqSerializedHead); int allocatedSize = sizeof(TqSerializedHead);
int offset = lseek(pMeta->fileFd, 0, SEEK_CUR); int offset = lseek(pMeta->fileFd, 0, SEEK_CUR);
tqReadLastPage(pMeta->idxFd, &idxBuf);
if(idxBuf.head.writeOffset == TQ_PAGE_SIZE) {
lseek(pMeta->idxFd, 0, SEEK_END);
memset(&idxBuf, 0, TQ_PAGE_SIZE);
idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
} else {
bufPtr = POINTER_SHIFT(&idxBuf, idxBuf.head.writeOffset);
}
while(pHead != pNode) { while(pHead != pNode) {
int nBytes = 0; int nBytes = 0;
@ -308,18 +352,23 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
ASSERT(nBytesTxn == pSHead->ssize); ASSERT(nBytesTxn == pSHead->ssize);
nBytes += nBytesTxn; nBytes += nBytesTxn;
} }
pNode->handle.offset = offset;
offset += nBytes;
//write idx file //write idx file
//TODO: endian check and convert //TODO: endian check and convert
*(bufPtr++) = pNode->handle.key; *(bufPtr++) = pNode->handle.key;
*(bufPtr++) = pNode->handle.offset; *(bufPtr++) = pNode->handle.offset;
*(bufPtr++) = (int64_t)nBytes; *(bufPtr++) = (int64_t)nBytes;
if((char*)(bufPtr + 3) > writeBuf + TQ_PAGE_SIZE) { idxBuf.head.writeOffset += TQ_IDX_SIZE;
nBytes = write(pMeta->idxFd, writeBuf, sizeof(writeBuf));
if(idxBuf.head.writeOffset >= TQ_PAGE_SIZE) {
nBytes = write(pMeta->idxFd, &idxBuf, TQ_PAGE_SIZE);
//TODO: handle error with tfile //TODO: handle error with tfile
ASSERT(nBytes == sizeof(writeBuf)); ASSERT(nBytes == TQ_PAGE_SIZE);
memset(writeBuf, 0, TQ_PAGE_SIZE); memset(&idxBuf, 0, TQ_PAGE_SIZE);
bufPtr = (int64_t*)writeBuf; idxBuf.head.writeOffset = TQ_IDX_PAGE_HEAD_SIZE;
bufPtr = (int64_t*)&idxBuf.buffer;
} }
//remove from unpersist list //remove from unpersist list
pHead->unpersistNext = pNode->unpersistNext; pHead->unpersistNext = pNode->unpersistNext;
@ -331,7 +380,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
if(pNode->handle.valueInUse == TQ_DELETE_TOKEN && if(pNode->handle.valueInUse == TQ_DELETE_TOKEN &&
pNode->handle.valueInTxn == NULL pNode->handle.valueInTxn == NULL
) { ) {
int bucketKey = pNode->handle.key & TQ_BUCKET_SIZE; int bucketKey = pNode->handle.key & TQ_BUCKET_MASK;
TqMetaList* pBucketHead = pMeta->bucket[bucketKey]; TqMetaList* pBucketHead = pMeta->bucket[bucketKey];
if(pBucketHead == pNode) { if(pBucketHead == pNode) {
pMeta->bucket[bucketKey] = pNode->next; pMeta->bucket[bucketKey] = pNode->next;
@ -351,11 +400,11 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
//write left bytes //write left bytes
free(pSHead); free(pSHead);
if((char*)bufPtr != writeBuf) { //TODO: write new version in tfile
int used = (char*)bufPtr - writeBuf; if((char*)bufPtr != idxBuf.buffer) {
int nBytes = write(pMeta->idxFd, writeBuf, used); int nBytes = write(pMeta->idxFd, &idxBuf, idxBuf.head.writeOffset);
//TODO: handle error in tfile //TODO: handle error in tfile
ASSERT(nBytes == used); ASSERT(nBytes == idxBuf.head.writeOffset);
} }
//TODO: using fsync in tfile //TODO: using fsync in tfile
fsync(pMeta->idxFd); fsync(pMeta->idxFd);
@ -364,7 +413,7 @@ int32_t tqStorePersist(TqMetaStore* pMeta) {
} }
static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) { static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
@ -397,11 +446,12 @@ static int32_t tqHandlePutCommitted(TqMetaStore* pMeta, int64_t key, void* value
} }
void* tqHandleGet(TqMetaStore* pMeta, int64_t key) { void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL) { if(pNode->handle.valueInUse != NULL
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
return pNode->handle.valueInUse; return pNode->handle.valueInUse;
} else { } else {
return NULL; return NULL;
@ -414,7 +464,7 @@ void* tqHandleGet(TqMetaStore* pMeta, int64_t key) {
} }
int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) { int32_t tqHandleMovePut(TqMetaStore* pMeta, int64_t key, void* value) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
@ -452,7 +502,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
return -1; return -1;
} }
memcpy(vmem, value, vsize); memcpy(vmem, value, vsize);
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
@ -484,7 +534,7 @@ int32_t tqHandleCopyPut(TqMetaStore* pMeta, int64_t key, void* value, size_t vsi
} }
static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) { static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
@ -502,10 +552,13 @@ static void* tqHandleGetUncommitted(TqMetaStore* pMeta, int64_t key) {
} }
int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
if(pNode->handle.valueInTxn == NULL) {
return -1;
}
if(pNode->handle.valueInUse if(pNode->handle.valueInUse
&& pNode->handle.valueInUse != TQ_DELETE_TOKEN) { && pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse); pMeta->deleter(pNode->handle.valueInUse);
@ -518,11 +571,11 @@ int32_t tqHandleCommit(TqMetaStore* pMeta, int64_t key) {
pNode = pNode->next; pNode = pNode->next;
} }
} }
return -1; return -2;
} }
int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.key == key) { if(pNode->handle.key == key) {
@ -543,12 +596,13 @@ int32_t tqHandleAbort(TqMetaStore* pMeta, int64_t key) {
} }
int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) { int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
int64_t bucketKey = key & TQ_BUCKET_SIZE; int64_t bucketKey = key & TQ_BUCKET_MASK;
TqMetaList* pNode = pMeta->bucket[bucketKey]; TqMetaList* pNode = pMeta->bucket[bucketKey];
while(pNode) { while(pNode) {
if(pNode->handle.valueInTxn if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
&& pNode->handle.valueInTxn != TQ_DELETE_TOKEN) { if(pNode->handle.valueInTxn) {
pMeta->deleter(pNode->handle.valueInTxn); pMeta->deleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = TQ_DELETE_TOKEN; pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
tqLinkUnpersist(pMeta, pNode); tqLinkUnpersist(pMeta, pNode);
return 0; return 0;
@ -560,34 +614,7 @@ int32_t tqHandleDel(TqMetaStore* pMeta, int64_t key) {
return -1; return -1;
} }
int32_t tqHandleClear(TqMetaStore* pMeta, int64_t key) { //TODO: clean deleted idx and data from persistent file
int64_t bucketKey = key & TQ_BUCKET_SIZE; int32_t tqStoreCompact(TqMetaStore *pMeta) {
TqMetaList* pNode = pMeta->bucket[bucketKey]; return 0;
bool exist = false;
while(pNode) {
if(pNode->handle.key == key) {
if(pNode->handle.valueInUse != NULL) {
exist = true;
if(pNode->handle.valueInUse != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInUse);
}
pNode->handle.valueInUse = TQ_DELETE_TOKEN;
}
if(pNode->handle.valueInTxn != NULL) {
exist = true;
if(pNode->handle.valueInTxn != TQ_DELETE_TOKEN) {
pMeta->deleter(pNode->handle.valueInTxn);
}
pNode->handle.valueInTxn = TQ_DELETE_TOKEN;
}
if(exist) {
tqLinkUnpersist(pMeta, pNode);
return 0;
}
return -1;
} else {
pNode = pNode->next;
}
}
return -2;
} }

View File

@ -57,6 +57,10 @@ TEST_F(TqMetaTest, copyPutTest) {
Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1); Foo* pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo->a, 3);
} }
TEST_F(TqMetaTest, persistTest) { TEST_F(TqMetaTest, persistTest) {
@ -82,8 +86,6 @@ TEST_F(TqMetaTest, persistTest) {
pBar = (Foo*)tqHandleGet(pMeta, 2); pBar = (Foo*)tqHandleGet(pMeta, 2);
EXPECT_EQ(pBar == NULL, true); EXPECT_EQ(pBar == NULL, true);
//taosRemoveDir(pathName);
} }
TEST_F(TqMetaTest, uncommittedTest) { TEST_F(TqMetaTest, uncommittedTest) {
@ -130,4 +132,163 @@ TEST_F(TqMetaTest, deleteTest) {
tqHandleCommit(pMeta, 1); tqHandleCommit(pMeta, 1);
pFoo = (Foo*) tqHandleGet(pMeta, 1); pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true); EXPECT_EQ(pFoo == NULL, true);
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
ASSERT(pMeta);
pFoo = (Foo*) tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo == NULL, true);
}
TEST_F(TqMetaTest, intxnPersist) {
Foo* pFoo = (Foo*)malloc(sizeof(Foo));
pFoo->a = 3;
tqHandleMovePut(pMeta, 1, pFoo);
tqHandleCommit(pMeta, 1);
Foo* pBar = (Foo*)malloc(sizeof(Foo));
pBar->a = 4;
tqHandleMovePut(pMeta, 1, pBar);
Foo* pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 3);
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 3);
tqHandleCommit(pMeta, 1);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 4);
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
ASSERT(pMeta);
pFoo1 = (Foo*)tqHandleGet(pMeta, 1);
EXPECT_EQ(pFoo1->a, 4);
}
TEST_F(TqMetaTest, multiplePage) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
v.push_back(rand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for(int i = 0; i < 500; i++) {
tqHandleCommit(pMeta, i);
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]);
}
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
ASSERT(pMeta);
for(int i = 500; i < 1000; i++) {
tqHandleCommit(pMeta, i);
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]);
}
for(int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]);
}
}
TEST_F(TqMetaTest, multipleRewrite) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
v.push_back(rand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for(int i = 0; i < 500; i++) {
tqHandleCommit(pMeta, i);
v[i] = rand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for(int i = 500; i < 1000; i++) {
v[i] = rand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for(int i = 0; i < 1000; i++) {
tqHandleCommit(pMeta, i);
}
tqStoreClose(pMeta);
pMeta = tqStoreOpen(pathName,
FooSerializer, FooDeserializer, FooDeleter);
ASSERT(pMeta);
for(int i = 500; i < 1000; i++) {
v[i] = rand();
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
tqHandleCommit(pMeta, i);
}
for(int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]);
}
}
TEST_F(TqMetaTest, dupCommit) {
srand(0);
std::vector<int> v;
for(int i = 0; i < 1000; i++) {
v.push_back(rand());
Foo foo;
foo.a = v[i];
tqHandleCopyPut(pMeta, i, &foo, sizeof(Foo));
}
for(int i = 0; i < 1000; i++) {
int ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, 0);
ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, -1);
}
for(int i = 0; i < 1000; i++) {
int ret = tqHandleCommit(pMeta, i);
EXPECT_EQ(ret, -1);
}
for(int i = 0; i < 1000; i++) {
Foo* pFoo = (Foo*)tqHandleGet(pMeta, i);
ASSERT_EQ(pFoo != NULL, true) << " at idx " << i << "\n";
EXPECT_EQ(pFoo->a, v[i]);
}
} }