[td-13039] add page compression.

This commit is contained in:
Haojun Liao 2022-02-23 19:39:43 +08:00
parent d3228cf958
commit c440e55395
4 changed files with 344 additions and 262 deletions

View File

@ -136,6 +136,11 @@ int32_t getPageId(const SPageInfo* pPgInfo);
*/
int32_t getBufPageSize(const SDiskbasedBuf* pBuf);
/**
*
* @param pBuf
* @return
*/
int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf);
/**
@ -147,10 +152,10 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf);
/**
* Set the buffer page is dirty, and needs to be flushed to disk when swap out.
* @param pPageInfo
* @param pPage
* @param dirty
*/
void setBufPageDirty(void* pPageInfo, bool dirty);
void setBufPageDirty(void* pPage, bool dirty);
/**
* Set the compress/ no-compress flag for paged buffer, when flushing data in disk.
@ -158,6 +163,13 @@ void setBufPageDirty(void* pPageInfo, bool dirty);
*/
void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp);
/**
* Set the pageId page buffer is not need
* @param pBuf
* @param pageId
*/
void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage);
/**
* Print the statistics when closing this buffer
* @param pBuf

View File

@ -49,7 +49,7 @@ typedef struct SLHashNode {
} SLHashNode;
#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode))
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + (_n)->keyLen)
#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen)
#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen)
static int32_t doAddNewBucket(SLHashObj* pHashObj);
@ -60,11 +60,166 @@ static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) {
static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) {
int32_t v = bucketId - (1ul << (bits - 1));
ASSERT(v < numOfBuckets);
return v;
}
static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) {
int32_t splitBucketId = (1ul << (bits - 1)) ^ bucketId;
return splitBucketId;
}
static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
*(int32_t*) p = keyLen;
p += sizeof(int32_t);
*(int32_t*) p = size;
p += sizeof(int32_t);
memcpy(p, key, keyLen);
p += keyLen;
memcpy(p, data, size);
}
static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen,
const void* data, int32_t size) {
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
ASSERT (pPage != NULL);
// put to current buf page
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf));
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
releaseBufPage(pHashObj->pBuf, pPage);
// allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
if (pNewPage == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
taosArrayPush(pBucket->pPageIdList, &newPageId);
doCopyObject(pNewPage->data, key, keyLen, data, size);
pNewPage->num = sizeof(SFilePage) + nodeSize;
setBufPageDirty(pNewPage, true);
releaseBufPage(pHashObj->pBuf, pNewPage);
} else {
char* p = (char*) pPage + pPage->num;
doCopyObject(p, key, keyLen, data, size);
pPage->num += nodeSize;
setBufPageDirty(pPage, true);
releaseBufPage(pHashObj->pBuf, pPage);
}
pBucket->size += 1;
printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
return TSDB_CODE_SUCCESS;
}
static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1);
int32_t len = GET_LHASH_NODE_LEN(pNode);
char* p = (char*) pNode + len;
char* pEnd = (char*)pPage + pPage->num;
memmove(pNode, p, (pEnd - p));
pPage->num -= len;
if (pPage->num == 0) {
// this page is empty, could be recycle in the future.
}
setBufPageDirty(pPage, true);
pBucket->size -= 1;
}
static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) {
size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList);
if (numOfPages <= 1) {
return;
}
int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0);
SFilePage* pFirst = getBufPage(pHashObj->pBuf, *firstPage);
int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId);
char* pStart = pLast->data;
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
while (1) {
if (pFirst->num + nodeSize < getBufPageSize(pHashObj->pBuf)) {
char* p = ((char*)pFirst) + pFirst->num;
SLHashNode* pNode = (SLHashNode*)pStart;
doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen);
setBufPageDirty(pFirst, true);
pFirst->num += nodeSize;
pLast->num -= nodeSize;
pStart += nodeSize;
if (pStart - pLast->data >= pLast->num) {
// this is empty
dBufSetBufPageRecycled(pHashObj->pBuf, pLast);
taosArrayRemove(pBucket->pPageIdList, numOfPages - 1);
break;
}
nodeSize = GET_LHASH_NODE_LEN(pStart);
} else { // move to the front of pLast page
memmove(pLast->data, pStart,(((char*)pLast) + pLast->num - pStart));
break;
}
}
}
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) {
int32_t newLen = pHashObj->numOfAlloc * 1.25;
if (newLen == pHashObj->numOfAlloc) {
newLen += 4;
}
char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets);
pHashObj->pBucket = (SLHashBucket**) p;
pHashObj->numOfAlloc = newLen;
}
SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket));
pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;
pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
if (pBucket->pPageIdList == NULL || pBucket == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
p->num = sizeof(SFilePage);
setBufPageDirty(p, true);
releaseBufPage(pHashObj->pBuf, p);
taosArrayPush(pBucket->pPageIdList, &pageId);
pHashObj->numOfBuckets += 1;
printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return TSDB_CODE_SUCCESS;
}
SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) {
SLHashObj* pHashObj = calloc(1, sizeof(SLHashObj));
if (pHashObj == NULL) {
@ -114,113 +269,6 @@ void* tHashCleanup(SLHashObj* pHashObj) {
return NULL;
}
static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) {
*(int32_t*) p = keyLen;
p += sizeof(int32_t);
*(int32_t*) p = size;
p += sizeof(int32_t);
memcpy(p, key, keyLen);
p += keyLen;
memcpy(p, data, size);
}
static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen,
const void* data, int32_t size) {
int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList);
SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId);
ASSERT (pPage != NULL);
// put to current buf page
size_t nodeSize = sizeof(SLHashNode) + keyLen + size;
ASSERT(nodeSize <= getBufPageSize(pHashObj->pBuf));
if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) {
releaseBufPage(pHashObj->pBuf, pPage);
// allocate the overflow buffer page to hold this k/v.
int32_t newPageId = -1;
SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId);
if (pNewPage == 0) {
// TODO handle error
}
taosArrayPush(pBucket->pPageIdList, &newPageId);
doCopyObject(pNewPage->data, key, keyLen, data, size);
pNewPage->num = nodeSize;
setBufPageDirty(pNewPage, true);
releaseBufPage(pHashObj->pBuf, pNewPage);
} else {
char* p = pPage->data + pPage->num;
doCopyObject(p, key, keyLen, data, size);
pPage->num += nodeSize;
setBufPageDirty(pPage, true);
releaseBufPage(pHashObj->pBuf, pPage);
}
pBucket->size += 1;
// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key);
}
// TODO merge the fragments on multiple pages to recycle the empty disk page ASAP
static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) {
ASSERT(pPage != NULL && pNode != NULL);
int32_t len = GET_LHASH_NODE_LEN(pNode);
char* p = (char*) pNode + len;
char* pEnd = pPage->data + pPage->num;
memmove(pNode, p, (pEnd - p));
pPage->num -= len;
if (pPage->num == 0) {
// this page is empty, could be recycle in the future.
}
setBufPageDirty(pPage, true);
pBucket->size -= 1;
}
static int32_t doAddNewBucket(SLHashObj* pHashObj) {
if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) {
int32_t newLen = pHashObj->numOfAlloc * 1.25;
if (newLen == pHashObj->numOfAlloc) {
newLen += 4;
}
char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen);
if (p == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets);
pHashObj->pBucket = (SLHashBucket**) p;
pHashObj->numOfAlloc = newLen;
}
SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket));
pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket;
pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t));
if (pBucket->pPageIdList == NULL || pBucket == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int32_t pageId = -1;
SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId);
releaseBufPage(pHashObj->pBuf, p);
taosArrayPush(pBucket->pPageIdList, &pageId);
pHashObj->numOfBuckets += 1;
// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets);
return TSDB_CODE_SUCCESS;
}
int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
ASSERT(pHashObj != NULL && key != NULL);
@ -231,17 +279,16 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
int32_t hashVal = pHashObj->hashFn(key, keyLen);
int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits);
if (pHashObj->numOfBuckets > v) {
SLHashBucket* pBucket = pHashObj->pBucket[v];
if (v >= pHashObj->numOfBuckets) {
int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId);
v = newBucketId;
}
// TODO check return code
doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
} else { // no matched bucket exists, find the candidate bucket
int32_t bucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets);
// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, bucketId);
SLHashBucket* pBucket = pHashObj->pBucket[bucketId];
doAddToBucket(pHashObj, pBucket, bucketId, key, keyLen, data, size);
SLHashBucket* pBucket = pHashObj->pBucket[v];
int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
@ -252,42 +299,46 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
int32_t newBucketId = pHashObj->numOfBuckets;
int32_t code = doAddNewBucket(pHashObj);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2));
if (numOfBits > pHashObj->bits) {
// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId);
ASSERT(numOfBits == pHashObj->bits + 1);
pHashObj->bits = numOfBits;
}
int32_t splitBucketId = (1ul << (pHashObj->bits - 1)) ^ newBucketId;
int32_t splitBucketId = doGetRelatedSplitBucketId(newBucketId, pHashObj->bits);
// load all data in this bucket and check if the data needs to relocated into the new bucket
SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId];
// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId);
for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) {
int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i);
SFilePage* p = getBufPage(pHashObj->pBuf, pageId);
char* pStart = p->data;
while (pStart - p->data < p->num) {
while (pStart - ((char*) p) < p->num) {
SLHashNode* pNode = (SLHashNode*)pStart;
ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0);
char* k = GET_LHASH_NODE_KEY(pNode);
int32_t hashv = pHashObj->hashFn(k, pNode->keyLen);
int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits);
int32_t v1 = hashv & ((1ul << (pHashObj->bits)) - 1);
if (v1 != splitBucketId) { // place it into the new bucket
ASSERT(v1 == newBucketId);
// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1);
SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId];
doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen,
GET_LHASH_NODE_KEY(pNode), pNode->dataLen);
doRemoveFromBucket(p, pNode, pBucket);
} else {
// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1);
int32_t nodeSize = GET_LHASH_NODE_LEN(pStart);
pStart += nodeSize;
@ -295,7 +346,11 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data
}
releaseBufPage(pHashObj->pBuf, p);
}
doCompressBucketPages(pHashObj, pBucket);
}
return TSDB_CODE_SUCCESS;
}
char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
@ -332,7 +387,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) {
}
int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) {
// todo
}
void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
@ -343,8 +398,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) {
if (type == LINEAR_HASH_DATA) {
for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) {
// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size,
(int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList));
}
} else {
dBufPrintStatis(pHashObj->pBuf);

View File

@ -23,28 +23,28 @@
#pragma GCC diagnostic ignored "-Wunused-function"
#pragma GCC diagnostic ignored "-Wunused-variable"
#pragma GCC diagnostic ignored "-Wsign-compare"
#include "os.h"
TEST(testCase, linear_hash_Tests) {
srand(time(NULL));
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
#if 1
SLHashObj* pHashObj = tHashInit(220000, 64 + 8, fn, 4);
for(int32_t i = 0; i < 500000; ++i) {
tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
SLHashObj* pHashObj = tHashInit(10, 128 + 8, fn, 8);
for(int32_t i = 0; i < 100; ++i) {
int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i));
assert(code == 0);
}
tHashPrint(pHashObj, LINEAR_HASH_STATIS);
// tHashPrint(pHashObj, LINEAR_HASH_STATIS);
for(int32_t i = 0; i < 10000; ++i) {
char* v = tHashGet(pHashObj, &i, sizeof(i));
if (v != NULL) {
// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
} else {
printf("failed to found key:%d in hash\n", i);
}
}
// for(int32_t i = 0; i < 10000; ++i) {
// char* v = tHashGet(pHashObj, &i, sizeof(i));
// if (v != NULL) {
//// printf("find value: %d, key:%d\n", *(int32_t*) v, i);
// } else {
// printf("failed to found key:%d in hash\n", i);
// }
// }
tHashPrint(pHashObj, LINEAR_HASH_DATA);
tHashCleanup(pHashObj);

View File

@ -5,25 +5,26 @@
#include "tcompression.h"
#include "thash.h"
//enum {
// true = 0x1,
// BUF_PAGE_RELEASED = 0x2,
// true = 0x3,
//};
#define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SFreeListItem {
int32_t offset;
int32_t len;
} SFreeListItem;
typedef struct SPageDiskInfo {
int64_t offset;
int32_t length;
} SPageDiskInfo;
} SPageDiskInfo, SFreeListItem;
struct SPageInfo {
SListNode* pn; // point to list node
void* pData;
int64_t offset;
int32_t pageId;
int32_t length:30;
int32_t length:29;
bool used:1; // set current page is in used
bool dirty:1; // set current buffer page is dirty or not
};
@ -51,46 +52,6 @@ struct SDiskbasedBuf {
SDiskbasedBufStatis statis;
};
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf;
if (pResBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1;
pResBuf->comp = true;
pResBuf->file = NULL;
pResBuf->qId = qId;
pResBuf->fileSize = 0;
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false);
char path[PATH_MAX] = {0};
taosGetTmpfilePath(dir, "qbuf", path);
pResBuf->path = strdup(path);
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize,
// pResBuf->inMemPages, pResBuf->path);
return TSDB_CODE_SUCCESS;
}
static int32_t createDiskFile(SDiskbasedBuf* pBuf) {
pBuf->file = fopen(pBuf->path, "wb+");
if (pBuf->file == NULL) {
@ -135,10 +96,10 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
size_t num = taosArrayGetSize(pBuf->pFree);
for(int32_t i = 0; i < num; ++i) {
SFreeListItem* pi = taosArrayGet(pBuf->pFree, i);
if (pi->len >= size) {
if (pi->length >= size) {
offset = pi->offset;
pi->offset += (int32_t)size;
pi->len -= (int32_t)size;
pi->length -= (int32_t)size;
return offset;
}
@ -160,7 +121,7 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
assert(!pg->used && pg->pData != NULL);
int32_t size = -1;
int32_t size = pBuf->pageSize;
char* t = NULL;
if (pg->offset == -1 || pg->dirty) {
void* payload = GET_DATA_PAYLOAD(pg);
@ -169,66 +130,68 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
// this page is flushed to disk for the first time
if (pg->offset == -1) {
assert(pg->dirty == true);
if (pg->dirty) {
if (pg->offset == -1) {
assert(pg->dirty == true);
pg->offset = allocatePositionInFile(pBuf, size);
pBuf->nextPos += size;
int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
ret = (int32_t) fwrite(t, 1, size, pBuf->file);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} else if (pg->dirty) {
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if (pg->length < size) {
// 1. add current space to free list
SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info
pg->offset = allocatePositionInFile(pBuf, size);
pBuf->nextPos += size;
}
// 3. write to disk.
int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
ret = (int32_t)fwrite(t, 1, size, pBuf->file);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
ret = (int32_t)fwrite(t, 1, size, pBuf->file);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} else {
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
} else {
// length becomes greater, current space is not enough, allocate new place, otherwise, do nothing
if (pg->length < size) {
// 1. add current space to free list
SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset};
taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info
pg->offset = allocatePositionInFile(pBuf, size);
pBuf->nextPos += size;
}
// 3. write to disk.
int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET);
if (ret != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
ret = (int32_t)fwrite(t, 1, size, pBuf->file);
if (ret != size) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
pBuf->statis.flushBytes += size;
pBuf->statis.flushPages += 1;
}
} else {// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
size = pg->length;
}
assert(size >= 0);
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
char* pDataBuf = pg->pData;
memset(pDataBuf, 0, pBuf->pageSize);
@ -313,13 +276,10 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag
static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListIter iter = {0};
tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD);
SListNode* pn = NULL;
while((pn = tdListNext(&iter)) != NULL) {
assert(pn != NULL);
SPageInfo* pageInfo = *(SPageInfo**) pn->data;
assert(pageInfo->pageId >= 0 && pageInfo->pn == pn);
@ -377,6 +337,56 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) {
return pageSize + POINTER_BYTES + 2;
}
static SPageInfo* getPageInfoFromPayload(void* page) {
int32_t offset = offsetof(SPageInfo, pData);
char* p = page - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0];
return ppi;
}
int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) {
*pBuf = calloc(1, sizeof(SDiskbasedBuf));
SDiskbasedBuf* pResBuf = *pBuf;
if (pResBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pResBuf->pageSize = pagesize;
pResBuf->numOfPages = 0; // all pages are in buffer in the first place
pResBuf->totalBufSize = 0;
pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit.
pResBuf->allocateId = -1;
pResBuf->comp = true;
pResBuf->file = NULL;
pResBuf->qId = qId;
pResBuf->fileSize = 0;
pResBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem));
// at least more than 2 pages must be in memory
assert(inMemBufSize >= pagesize * 2);
pResBuf->lruList = tdListNew(POINTER_BYTES);
// init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pResBuf->groupSet = taosHashInit(10, fn, true, false);
pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES
pResBuf->all = taosHashInit(10, fn, true, false);
char path[PATH_MAX] = {0};
taosGetTmpfilePath(dir, "paged-buf", path);
pResBuf->path = strdup(path);
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize,
// pResBuf->inMemPages, pResBuf->path);
return TSDB_CODE_SUCCESS;
}
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
pBuf->statis.getPages += 1;
@ -386,6 +396,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
// Failed to allocate a new buffer page, and there is an error occurs.
if (availablePage == NULL) {
assert(0);
return NULL;
}
}
@ -393,10 +404,6 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) {
// register new id in this group
*pageId = (++pBuf->allocateId);
if (*pageId == 11) {
printf("page is allocated, id:%d\n", *pageId);
}
// register page id info
SPageInfo* pi = registerPage(pBuf, groupId, *pageId);
@ -443,7 +450,6 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(*pi)->used = true;
return (void *)(GET_DATA_PAYLOAD(*pi));
} else { // not in memory
assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0);
@ -477,15 +483,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
assert(pBuf != NULL && page != NULL);
int32_t offset = offsetof(SPageInfo, pData);
char* p = page - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0];
SPageInfo* ppi = getPageInfoFromPayload(page);
releaseBufPageInfo(pBuf, ppi);
}
void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
assert(pi->pData != NULL && pi->used);
assert(pi->pData != NULL && pi->used == true);
pi->used = false;
pBuf->statis.releasePages += 1;
@ -549,6 +552,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) {
tdListFree(pBuf->lruList);
taosArrayDestroy(pBuf->emptyDummyIdList);
taosArrayDestroy(pBuf->pFree);
taosHashCleanup(pBuf->groupSet);
taosHashCleanup(pBuf->all);
@ -580,10 +585,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) {
}
void setBufPageDirty(void* pPage, bool dirty) {
int32_t offset = offsetof(SPageInfo, pData);
char* p = (char*)pPage - offset;
SPageInfo* ppi = ((SPageInfo**) p)[0];
SPageInfo* ppi = getPageInfoFromPayload(pPage);
ppi->dirty = dirty;
}
@ -591,6 +593,18 @@ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) {
pBuf->comp = comp;
}
void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage) {
SPageInfo* ppi = getPageInfoFromPayload(pPage);
ppi->used = false;
ppi->dirty = false;
// it is a in-memory page that has not been flushed to disk yet.
if (ppi->length != -1 && ppi->offset != -1) {
SFreeListItem item = {.length = ppi->length, .offset = ppi->offset};
taosArrayPush(pBuf->pFree, &item);
}
}
void dBufSetPrintInfo(SDiskbasedBuf* pBuf) {
pBuf->printStatis = true;
@ -618,3 +632,4 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) {
ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages,
ps->loadBytes / (1024.0 * ps->loadPages));
}