From c440e55395123270137af62a455da3d329744eb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Feb 2022 19:39:43 +0800 Subject: [PATCH] [td-13039] add page compression. --- include/util/tpagedbuf.h | 16 +- source/libs/executor/src/tlinearhash.c | 315 +++++++++++++---------- source/libs/executor/test/lhashTests.cpp | 26 +- source/util/src/tpagedbuf.c | 249 +++++++++--------- 4 files changed, 344 insertions(+), 262 deletions(-) diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index 4cd0797df3..decf952bcd 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -136,6 +136,11 @@ int32_t getPageId(const SPageInfo* pPgInfo); */ int32_t getBufPageSize(const SDiskbasedBuf* pBuf); +/** + * + * @param pBuf + * @return + */ int32_t getNumOfInMemBufPages(const SDiskbasedBuf* pBuf); /** @@ -147,10 +152,10 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf); /** * Set the buffer page is dirty, and needs to be flushed to disk when swap out. - * @param pPageInfo + * @param pPage * @param dirty */ -void setBufPageDirty(void* pPageInfo, bool dirty); +void setBufPageDirty(void* pPage, bool dirty); /** * Set the compress/ no-compress flag for paged buffer, when flushing data in disk. @@ -158,6 +163,13 @@ void setBufPageDirty(void* pPageInfo, bool dirty); */ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp); +/** + * Set the pageId page buffer is not need + * @param pBuf + * @param pageId + */ +void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage); + /** * Print the statistics when closing this buffer * @param pBuf diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index 152102f0c3..803ce8bba2 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -49,7 +49,7 @@ typedef struct SLHashNode { } SLHashNode; #define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode)) -#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + (_n)->keyLen) +#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen) #define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen) static int32_t doAddNewBucket(SLHashObj* pHashObj); @@ -60,11 +60,166 @@ static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) { static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) { int32_t v = bucketId - (1ul << (bits - 1)); - ASSERT(v < numOfBuckets); return v; } +static int32_t doGetRelatedSplitBucketId(int32_t bucketId, int32_t bits) { + int32_t splitBucketId = (1ul << (bits - 1)) ^ bucketId; + return splitBucketId; +} + +static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { + *(int32_t*) p = keyLen; + p += sizeof(int32_t); + *(int32_t*) p = size; + p += sizeof(int32_t); + + memcpy(p, key, keyLen); + p += keyLen; + + memcpy(p, data, size); +} + +static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen, + const void* data, int32_t size) { + int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList); + + SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId); + ASSERT (pPage != NULL); + + // put to current buf page + size_t nodeSize = sizeof(SLHashNode) + keyLen + size; + ASSERT(nodeSize + sizeof(SFilePage) <= getBufPageSize(pHashObj->pBuf)); + + if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) { + releaseBufPage(pHashObj->pBuf, pPage); + + // allocate the overflow buffer page to hold this k/v. + int32_t newPageId = -1; + SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); + if (pNewPage == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + taosArrayPush(pBucket->pPageIdList, &newPageId); + + doCopyObject(pNewPage->data, key, keyLen, data, size); + pNewPage->num = sizeof(SFilePage) + nodeSize; + + setBufPageDirty(pNewPage, true); + releaseBufPage(pHashObj->pBuf, pNewPage); + } else { + char* p = (char*) pPage + pPage->num; + doCopyObject(p, key, keyLen, data, size); + pPage->num += nodeSize; + setBufPageDirty(pPage, true); + releaseBufPage(pHashObj->pBuf, pPage); + } + + pBucket->size += 1; + printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); + + return TSDB_CODE_SUCCESS; +} + +static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) { + ASSERT(pPage != NULL && pNode != NULL && pBucket->size >= 1); + + int32_t len = GET_LHASH_NODE_LEN(pNode); + char* p = (char*) pNode + len; + + char* pEnd = (char*)pPage + pPage->num; + memmove(pNode, p, (pEnd - p)); + + pPage->num -= len; + if (pPage->num == 0) { + // this page is empty, could be recycle in the future. + } + + setBufPageDirty(pPage, true); + + pBucket->size -= 1; +} + +static void doCompressBucketPages(SLHashObj *pHashObj, SLHashBucket* pBucket) { + size_t numOfPages = taosArrayGetSize(pBucket->pPageIdList); + if (numOfPages <= 1) { + return; + } + + int32_t* firstPage = taosArrayGet(pBucket->pPageIdList, 0); + SFilePage* pFirst = getBufPage(pHashObj->pBuf, *firstPage); + + int32_t* pageId = taosArrayGetLast(pBucket->pPageIdList); + SFilePage* pLast = getBufPage(pHashObj->pBuf, *pageId); + + char* pStart = pLast->data; + int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); + while (1) { + if (pFirst->num + nodeSize < getBufPageSize(pHashObj->pBuf)) { + char* p = ((char*)pFirst) + pFirst->num; + + SLHashNode* pNode = (SLHashNode*)pStart; + doCopyObject(p, GET_LHASH_NODE_KEY(pStart), pNode->keyLen, GET_LHASH_NODE_DATA(pStart), pNode->dataLen); + setBufPageDirty(pFirst, true); + + pFirst->num += nodeSize; + pLast->num -= nodeSize; + pStart += nodeSize; + if (pStart - pLast->data >= pLast->num) { + // this is empty + dBufSetBufPageRecycled(pHashObj->pBuf, pLast); + taosArrayRemove(pBucket->pPageIdList, numOfPages - 1); + break; + } + + nodeSize = GET_LHASH_NODE_LEN(pStart); + } else { // move to the front of pLast page + memmove(pLast->data, pStart,(((char*)pLast) + pLast->num - pStart)); + break; + } + } +} + +static int32_t doAddNewBucket(SLHashObj* pHashObj) { + if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) { + int32_t newLen = pHashObj->numOfAlloc * 1.25; + if (newLen == pHashObj->numOfAlloc) { + newLen += 4; + } + + char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets); + pHashObj->pBucket = (SLHashBucket**) p; + pHashObj->numOfAlloc = newLen; + } + + SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket)); + pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; + + pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); + if (pBucket->pPageIdList == NULL || pBucket == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t pageId = -1; + SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); + p->num = sizeof(SFilePage); + setBufPageDirty(p, true); + + releaseBufPage(pHashObj->pBuf, p); + taosArrayPush(pBucket->pPageIdList, &pageId); + + pHashObj->numOfBuckets += 1; + printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); + return TSDB_CODE_SUCCESS; +} + SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) { SLHashObj* pHashObj = calloc(1, sizeof(SLHashObj)); if (pHashObj == NULL) { @@ -114,113 +269,6 @@ void* tHashCleanup(SLHashObj* pHashObj) { return NULL; } -static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { - *(int32_t*) p = keyLen; - p += sizeof(int32_t); - *(int32_t*) p = size; - p += sizeof(int32_t); - - memcpy(p, key, keyLen); - p += keyLen; - - memcpy(p, data, size); -} - -static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen, - const void* data, int32_t size) { - int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList); - - SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId); - ASSERT (pPage != NULL); - - // put to current buf page - size_t nodeSize = sizeof(SLHashNode) + keyLen + size; - ASSERT(nodeSize <= getBufPageSize(pHashObj->pBuf)); - - if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) { - releaseBufPage(pHashObj->pBuf, pPage); - - // allocate the overflow buffer page to hold this k/v. - int32_t newPageId = -1; - SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); - if (pNewPage == 0) { - // TODO handle error - } - - taosArrayPush(pBucket->pPageIdList, &newPageId); - - doCopyObject(pNewPage->data, key, keyLen, data, size); - pNewPage->num = nodeSize; - - setBufPageDirty(pNewPage, true); - releaseBufPage(pHashObj->pBuf, pNewPage); - } else { - char* p = pPage->data + pPage->num; - doCopyObject(p, key, keyLen, data, size); - pPage->num += nodeSize; - setBufPageDirty(pPage, true); - releaseBufPage(pHashObj->pBuf, pPage); - } - - pBucket->size += 1; -// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); -} - -// TODO merge the fragments on multiple pages to recycle the empty disk page ASAP -static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) { - ASSERT(pPage != NULL && pNode != NULL); - - int32_t len = GET_LHASH_NODE_LEN(pNode); - char* p = (char*) pNode + len; - - char* pEnd = pPage->data + pPage->num; - memmove(pNode, p, (pEnd - p)); - - pPage->num -= len; - if (pPage->num == 0) { - // this page is empty, could be recycle in the future. - } - - setBufPageDirty(pPage, true); - pBucket->size -= 1; -} - -static int32_t doAddNewBucket(SLHashObj* pHashObj) { - if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) { - int32_t newLen = pHashObj->numOfAlloc * 1.25; - if (newLen == pHashObj->numOfAlloc) { - newLen += 4; - } - - char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen); - if (p == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets); - pHashObj->pBucket = (SLHashBucket**) p; - pHashObj->numOfAlloc = newLen; - } - - SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket)); - pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; - - pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); - if (pBucket->pPageIdList == NULL || pBucket == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t pageId = -1; - SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); - releaseBufPage(pHashObj->pBuf, p); - - taosArrayPush(pBucket->pPageIdList, &pageId); - - pHashObj->numOfBuckets += 1; -// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); - return TSDB_CODE_SUCCESS; -} - int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) { ASSERT(pHashObj != NULL && key != NULL); @@ -231,17 +279,16 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data int32_t hashVal = pHashObj->hashFn(key, keyLen); int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits); - if (pHashObj->numOfBuckets > v) { - SLHashBucket* pBucket = pHashObj->pBucket[v]; + if (v >= pHashObj->numOfBuckets) { + int32_t newBucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); + printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, newBucketId); + v = newBucketId; + } - // TODO check return code - doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); - } else { // no matched bucket exists, find the candidate bucket - int32_t bucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); -// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, bucketId); - - SLHashBucket* pBucket = pHashObj->pBucket[bucketId]; - doAddToBucket(pHashObj, pBucket, bucketId, key, keyLen, data, size); + SLHashBucket* pBucket = pHashObj->pBucket[v]; + int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); + if (code != TSDB_CODE_SUCCESS) { + return code; } } @@ -252,42 +299,46 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data int32_t newBucketId = pHashObj->numOfBuckets; int32_t code = doAddNewBucket(pHashObj); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2)); if (numOfBits > pHashObj->bits) { -// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); - + printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); ASSERT(numOfBits == pHashObj->bits + 1); pHashObj->bits = numOfBits; } - int32_t splitBucketId = (1ul << (pHashObj->bits - 1)) ^ newBucketId; + int32_t splitBucketId = doGetRelatedSplitBucketId(newBucketId, pHashObj->bits); // load all data in this bucket and check if the data needs to relocated into the new bucket SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId]; -// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); + printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); SFilePage* p = getBufPage(pHashObj->pBuf, pageId); char* pStart = p->data; - while (pStart - p->data < p->num) { + while (pStart - ((char*) p) < p->num) { SLHashNode* pNode = (SLHashNode*)pStart; + ASSERT(pNode->keyLen > 0 && pNode->dataLen >= 0); char* k = GET_LHASH_NODE_KEY(pNode); int32_t hashv = pHashObj->hashFn(k, pNode->keyLen); + int32_t v1 = doGetBucketIdFromHashVal(hashv, pHashObj->bits); - int32_t v1 = hashv & ((1ul << (pHashObj->bits)) - 1); if (v1 != splitBucketId) { // place it into the new bucket ASSERT(v1 == newBucketId); -// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); + printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, GET_LHASH_NODE_KEY(pNode), pNode->dataLen); doRemoveFromBucket(p, pNode, pBucket); } else { -// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); + printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); pStart += nodeSize; @@ -295,7 +346,11 @@ int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data } releaseBufPage(pHashObj->pBuf, p); } + + doCompressBucketPages(pHashObj, pBucket); } + + return TSDB_CODE_SUCCESS; } char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) { @@ -332,7 +387,7 @@ char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) { } int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) { - + // todo } void tHashPrint(const SLHashObj* pHashObj, int32_t type) { @@ -343,8 +398,8 @@ void tHashPrint(const SLHashObj* pHashObj, int32_t type) { if (type == LINEAR_HASH_DATA) { for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { -// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, -// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); + printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, + (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); } } else { dBufPrintStatis(pHashObj->pBuf); diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp index 533fc67916..d0fe9c5ac0 100644 --- a/source/libs/executor/test/lhashTests.cpp +++ b/source/libs/executor/test/lhashTests.cpp @@ -23,28 +23,28 @@ #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" -#include "os.h" TEST(testCase, linear_hash_Tests) { srand(time(NULL)); _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); #if 1 - SLHashObj* pHashObj = tHashInit(220000, 64 + 8, fn, 4); - for(int32_t i = 0; i < 500000; ++i) { - tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); + SLHashObj* pHashObj = tHashInit(10, 128 + 8, fn, 8); + for(int32_t i = 0; i < 100; ++i) { + int32_t code = tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); + assert(code == 0); } - tHashPrint(pHashObj, LINEAR_HASH_STATIS); +// tHashPrint(pHashObj, LINEAR_HASH_STATIS); - for(int32_t i = 0; i < 10000; ++i) { - char* v = tHashGet(pHashObj, &i, sizeof(i)); - if (v != NULL) { -// printf("find value: %d, key:%d\n", *(int32_t*) v, i); - } else { - printf("failed to found key:%d in hash\n", i); - } - } +// for(int32_t i = 0; i < 10000; ++i) { +// char* v = tHashGet(pHashObj, &i, sizeof(i)); +// if (v != NULL) { +//// printf("find value: %d, key:%d\n", *(int32_t*) v, i); +// } else { +// printf("failed to found key:%d in hash\n", i); +// } +// } tHashPrint(pHashObj, LINEAR_HASH_DATA); tHashCleanup(pHashObj); diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 967443e93f..3fe3ddccde 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,25 +5,26 @@ #include "tcompression.h" #include "thash.h" +//enum { +// true = 0x1, +// BUF_PAGE_RELEASED = 0x2, +// true = 0x3, +//}; + #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) -typedef struct SFreeListItem { - int32_t offset; - int32_t len; -} SFreeListItem; - typedef struct SPageDiskInfo { int64_t offset; int32_t length; -} SPageDiskInfo; +} SPageDiskInfo, SFreeListItem; struct SPageInfo { SListNode* pn; // point to list node void* pData; int64_t offset; int32_t pageId; - int32_t length:30; + int32_t length:29; bool used:1; // set current page is in used bool dirty:1; // set current buffer page is dirty or not }; @@ -51,46 +52,6 @@ struct SDiskbasedBuf { SDiskbasedBufStatis statis; }; -int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { - *pBuf = calloc(1, sizeof(SDiskbasedBuf)); - - SDiskbasedBuf* pResBuf = *pBuf; - if (pResBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pResBuf->pageSize = pagesize; - pResBuf->numOfPages = 0; // all pages are in buffer in the first place - pResBuf->totalBufSize = 0; - pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. - pResBuf->allocateId = -1; - pResBuf->comp = true; - pResBuf->file = NULL; - pResBuf->qId = qId; - pResBuf->fileSize = 0; - - // at least more than 2 pages must be in memory - assert(inMemBufSize >= pagesize * 2); - - pResBuf->lruList = tdListNew(POINTER_BYTES); - - // init id hash table - pResBuf->groupSet = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES - pResBuf->all = taosHashInit(10, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - - char path[PATH_MAX] = {0}; - taosGetTmpfilePath(dir, "qbuf", path); - pResBuf->path = strdup(path); - - pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); - -// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, -// pResBuf->inMemPages, pResBuf->path); - - return TSDB_CODE_SUCCESS; -} - static int32_t createDiskFile(SDiskbasedBuf* pBuf) { pBuf->file = fopen(pBuf->path, "wb+"); if (pBuf->file == NULL) { @@ -135,10 +96,10 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { size_t num = taosArrayGetSize(pBuf->pFree); for(int32_t i = 0; i < num; ++i) { SFreeListItem* pi = taosArrayGet(pBuf->pFree, i); - if (pi->len >= size) { + if (pi->length >= size) { offset = pi->offset; pi->offset += (int32_t)size; - pi->len -= (int32_t)size; + pi->length -= (int32_t)size; return offset; } @@ -160,7 +121,7 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { assert(!pg->used && pg->pData != NULL); - int32_t size = -1; + int32_t size = pBuf->pageSize; char* t = NULL; if (pg->offset == -1 || pg->dirty) { void* payload = GET_DATA_PAYLOAD(pg); @@ -169,66 +130,68 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } // this page is flushed to disk for the first time - if (pg->offset == -1) { - assert(pg->dirty == true); + if (pg->dirty) { + if (pg->offset == -1) { + assert(pg->dirty == true); - pg->offset = allocatePositionInFile(pBuf, size); - pBuf->nextPos += size; - - int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); - if (ret != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - ret = (int32_t) fwrite(t, 1, size, pBuf->file); - if (ret != size) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } - - if (pBuf->fileSize < pg->offset + size) { - pBuf->fileSize = pg->offset + size; - } - - pBuf->statis.flushBytes += size; - pBuf->statis.flushPages += 1; - } else if (pg->dirty) { - // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing - if (pg->length < size) { - // 1. add current space to free list - SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset}; - taosArrayPush(pBuf->pFree, &dinfo); - - // 2. allocate new position, and update the info pg->offset = allocatePositionInFile(pBuf, size); pBuf->nextPos += size; - } - // 3. write to disk. - int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); - if (ret != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } + int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } - ret = (int32_t)fwrite(t, 1, size, pBuf->file); - if (ret != size) { - terrno = TAOS_SYSTEM_ERROR(errno); - return NULL; - } + ret = (int32_t)fwrite(t, 1, size, pBuf->file); + if (ret != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } - if (pBuf->fileSize < pg->offset + size) { - pBuf->fileSize = pg->offset + size; - } + if (pBuf->fileSize < pg->offset + size) { + pBuf->fileSize = pg->offset + size; + } - pBuf->statis.flushBytes += size; - pBuf->statis.flushPages += 1; - } else { + pBuf->statis.flushBytes += size; + pBuf->statis.flushPages += 1; + } else { + // length becomes greater, current space is not enough, allocate new place, otherwise, do nothing + if (pg->length < size) { + // 1. add current space to free list + SPageDiskInfo dinfo = {.length = pg->length, .offset = pg->offset}; + taosArrayPush(pBuf->pFree, &dinfo); + + // 2. allocate new position, and update the info + pg->offset = allocatePositionInFile(pBuf, size); + pBuf->nextPos += size; + } + + // 3. write to disk. + int32_t ret = fseek(pBuf->file, pg->offset, SEEK_SET); + if (ret != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + ret = (int32_t)fwrite(t, 1, size, pBuf->file); + if (ret != size) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } + + if (pBuf->fileSize < pg->offset + size) { + pBuf->fileSize = pg->offset + size; + } + + pBuf->statis.flushBytes += size; + pBuf->statis.flushPages += 1; + } + } else {// NOTE: the size may be -1, the this recycle page has not been flushed to disk yet. size = pg->length; } - assert(size >= 0); + ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); char* pDataBuf = pg->pData; memset(pDataBuf, 0, pBuf->pageSize); @@ -313,13 +276,10 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListIter iter = {0}; - tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD); SListNode* pn = NULL; while((pn = tdListNext(&iter)) != NULL) { - assert(pn != NULL); - SPageInfo* pageInfo = *(SPageInfo**) pn->data; assert(pageInfo->pageId >= 0 && pageInfo->pn == pn); @@ -377,6 +337,56 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2; } +static SPageInfo* getPageInfoFromPayload(void* page) { + int32_t offset = offsetof(SPageInfo, pData); + char* p = page - offset; + + SPageInfo* ppi = ((SPageInfo**) p)[0]; + return ppi; +} + +int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { + *pBuf = calloc(1, sizeof(SDiskbasedBuf)); + + SDiskbasedBuf* pResBuf = *pBuf; + if (pResBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pResBuf->pageSize = pagesize; + pResBuf->numOfPages = 0; // all pages are in buffer in the first place + pResBuf->totalBufSize = 0; + pResBuf->inMemPages = inMemBufSize/pagesize; // maximum allowed pages, it is a soft limit. + pResBuf->allocateId = -1; + pResBuf->comp = true; + pResBuf->file = NULL; + pResBuf->qId = qId; + pResBuf->fileSize = 0; + pResBuf->pFree = taosArrayInit(4, sizeof(SFreeListItem)); + + // at least more than 2 pages must be in memory + assert(inMemBufSize >= pagesize * 2); + + pResBuf->lruList = tdListNew(POINTER_BYTES); + + // init id hash table + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); + pResBuf->groupSet = taosHashInit(10, fn, true, false); + pResBuf->assistBuf = malloc(pResBuf->pageSize + 2); // EXTRA BYTES + pResBuf->all = taosHashInit(10, fn, true, false); + + char path[PATH_MAX] = {0}; + taosGetTmpfilePath(dir, "paged-buf", path); + pResBuf->path = strdup(path); + + pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); + +// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, pResBuf->pageSize, +// pResBuf->inMemPages, pResBuf->path); + + return TSDB_CODE_SUCCESS; +} + void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { pBuf->statis.getPages += 1; @@ -386,6 +396,7 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { // Failed to allocate a new buffer page, and there is an error occurs. if (availablePage == NULL) { + assert(0); return NULL; } } @@ -393,10 +404,6 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { // register new id in this group *pageId = (++pBuf->allocateId); - if (*pageId == 11) { - printf("page is allocated, id:%d\n", *pageId); - } - // register page id info SPageInfo* pi = registerPage(pBuf, groupId, *pageId); @@ -443,7 +450,6 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { (*pi)->used = true; return (void *)(GET_DATA_PAYLOAD(*pi)); - } else { // not in memory assert((*pi)->pData == NULL && (*pi)->pn == NULL && (*pi)->length >= 0 && (*pi)->offset >= 0); @@ -477,15 +483,12 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { assert(pBuf != NULL && page != NULL); - int32_t offset = offsetof(SPageInfo, pData); - char* p = page - offset; - - SPageInfo* ppi = ((SPageInfo**) p)[0]; + SPageInfo* ppi = getPageInfoFromPayload(page); releaseBufPageInfo(pBuf, ppi); } void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { - assert(pi->pData != NULL && pi->used); + assert(pi->pData != NULL && pi->used == true); pi->used = false; pBuf->statis.releasePages += 1; @@ -549,6 +552,8 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { tdListFree(pBuf->lruList); taosArrayDestroy(pBuf->emptyDummyIdList); + taosArrayDestroy(pBuf->pFree); + taosHashCleanup(pBuf->groupSet); taosHashCleanup(pBuf->all); @@ -580,10 +585,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { } void setBufPageDirty(void* pPage, bool dirty) { - int32_t offset = offsetof(SPageInfo, pData); - char* p = (char*)pPage - offset; - - SPageInfo* ppi = ((SPageInfo**) p)[0]; + SPageInfo* ppi = getPageInfoFromPayload(pPage); ppi->dirty = dirty; } @@ -591,6 +593,18 @@ void setBufPageCompressOnDisk(SDiskbasedBuf* pBuf, bool comp) { pBuf->comp = comp; } +void dBufSetBufPageRecycled(SDiskbasedBuf *pBuf, void* pPage) { + SPageInfo* ppi = getPageInfoFromPayload(pPage); + + ppi->used = false; + ppi->dirty = false; + + // it is a in-memory page that has not been flushed to disk yet. + if (ppi->length != -1 && ppi->offset != -1) { + SFreeListItem item = {.length = ppi->length, .offset = ppi->offset}; + taosArrayPush(pBuf->pFree, &item); + } +} void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; @@ -618,3 +632,4 @@ void dBufPrintStatis(const SDiskbasedBuf* pBuf) { ps->getPages, ps->releasePages, ps->flushBytes / 1024.0f, ps->flushPages, ps->loadBytes / 1024.0f, ps->loadPages, ps->loadBytes / (1024.0 * ps->loadPages)); } +