From af0180f2163c42cf6587af34cc021307b3a32976 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 23 Feb 2022 15:10:15 +0800 Subject: [PATCH] [td-13039] add linear hash. --- include/util/tarray.h | 4 +- include/util/tpagedbuf.h | 19 +- source/libs/executor/inc/tlinearhash.h | 44 +++ source/libs/executor/src/executorimpl.c | 6 +- source/libs/executor/src/tlinearhash.c | 350 ++++++++++++++++++ source/libs/executor/src/tsort.c | 8 +- source/libs/executor/test/executorTests.cpp | 4 +- source/libs/executor/test/lhashTests.cpp | 65 ++++ .../{executorUtilTests.cpp => sortTests.cpp} | 0 source/libs/function/src/tpercentile.c | 4 +- source/util/src/tpagedbuf.c | 60 +-- source/util/test/pageBufferTest.cpp | 34 +- 12 files changed, 542 insertions(+), 56 deletions(-) create mode 100644 source/libs/executor/inc/tlinearhash.h create mode 100644 source/libs/executor/src/tlinearhash.c create mode 100644 source/libs/executor/test/lhashTests.cpp rename source/libs/executor/test/{executorUtilTests.cpp => sortTests.cpp} (100%) diff --git a/include/util/tarray.h b/include/util/tarray.h index 6d6120a49b..117e60dede 100644 --- a/include/util/tarray.h +++ b/include/util/tarray.h @@ -43,8 +43,8 @@ extern "C" { typedef struct SArray { size_t size; - size_t capacity; - size_t elemSize; + uint32_t capacity; + uint32_t elemSize; void* pData; } SArray; diff --git a/include/util/tpagedbuf.h b/include/util/tpagedbuf.h index c1c246dd64..d9e233f8bb 100644 --- a/include/util/tpagedbuf.h +++ b/include/util/tpagedbuf.h @@ -30,7 +30,6 @@ typedef struct SPageInfo SPageInfo; typedef struct SDiskbasedBuf SDiskbasedBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes -#define DEFAULT_PAGE_SIZE (16384L) typedef struct SFilePage { int64_t num; @@ -64,7 +63,7 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem * @param pageId * @return */ -SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); +void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId); /** * @@ -80,7 +79,7 @@ SIDList getDataBufPagesIdList(SDiskbasedBuf* pBuf, int32_t groupId); * @param id * @return */ -SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id); +void* getBufPage(SDiskbasedBuf* pBuf, int32_t id); /** * release the referenced buf pages @@ -151,19 +150,27 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf); * @param pPageInfo * @param dirty */ -void setBufPageDirty(SFilePage* pPageInfo, bool dirty); +void setBufPageDirty(void* pPageInfo, bool dirty); /** * Print the statistics when closing this buffer * @param pBuf */ -void setPrintStatis(SDiskbasedBuf* pBuf); +void dBufSetPrintInfo(SDiskbasedBuf* pBuf); /** - * return buf statistics. + * Return buf statistics. + * @param pBuf + * @return */ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf); +/** + * Print the buffer statistics information + * @param pBuf + */ +void dBufPrintStatis(const SDiskbasedBuf* pBuf); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/inc/tlinearhash.h b/source/libs/executor/inc/tlinearhash.h new file mode 100644 index 0000000000..a419058216 --- /dev/null +++ b/source/libs/executor/inc/tlinearhash.h @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TLINEARHASH_H +#define TDENGINE_TLINEARHASH_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include "thash.h" + +enum { + LINEAR_HASH_STATIS = 0x1, + LINEAR_HASH_DATA = 0x2, +}; + +typedef struct SLHashObj SLHashObj; + +SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage); +void* tHashCleanup(SLHashObj* pHashObj); + +int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size); +char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen); +int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen); + +void tHashPrint(const SLHashObj* pHashObj, int32_t type); + +#ifdef __cplusplus +} +#endif +#endif // TDENGINE_TLINEARHASH_H diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f2d98ea86c..6f1a05805a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -705,7 +705,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes SIDList list = getDataBufPagesIdList(pResultBuf, tid); if (taosArrayGetSize(list) == 0) { - pData = getNewDataBuf(pResultBuf, tid, &pageId); + pData = getNewBufPage(pResultBuf, tid, &pageId); } else { SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, getPageId(pi)); @@ -714,7 +714,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes if (pData->num + size > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); - pData = getNewDataBuf(pResultBuf, tid, &pageId); + pData = getNewBufPage(pResultBuf, tid, &pageId); if (pData != NULL) { assert(pData->num == 0); // number of elements must be 0 for new allocated buffer } @@ -4603,7 +4603,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr tsBufSetTraverseOrder(pRuntimeEnv->pTsBuf, order); } - int32_t ps = DEFAULT_PAGE_SIZE; + int32_t ps = 4096; getIntermediateBufInfo(pRuntimeEnv, &ps, &pQueryAttr->intermediateResultRowSize); int32_t TENMB = 1024*1024*10; diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c new file mode 100644 index 0000000000..b644801a6e --- /dev/null +++ b/source/libs/executor/src/tlinearhash.c @@ -0,0 +1,350 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tlinearhash.h" +#include "tcfg.h" +#include "taoserror.h" +#include "tpagedbuf.h" + +#define LHASH_CAP_RATIO 0.85 + +// Always located in memory +typedef struct SLHashBucket { + SArray *pPageIdList; + int32_t size; // the number of element in this entry +} SLHashBucket; + +typedef struct SLHashObj { + SDiskbasedBuf *pBuf; + _hash_fn_t hashFn; + int32_t tuplesPerPage; + SLHashBucket **pBucket; // entry list + int32_t numOfAlloc; // number of allocated bucket ptr slot + int32_t bits; // the number of bits used in hash + int32_t numOfBuckets; // the number of buckets + int64_t size; // the number of total items +} SLHashObj; + +/** + * the data struct for each hash node + * +-----------+-------+--------+ + * | SLHashNode| key | data | + * +-----------+-------+--------+ + */ +typedef struct SLHashNode { + int32_t keyLen; + int32_t dataLen; +} SLHashNode; + +#define GET_LHASH_NODE_KEY(_n) (((char*)(_n)) + sizeof(SLHashNode)) +#define GET_LHASH_NODE_DATA(_n) ((char*)(_n) + sizeof(SLHashNode) + (_n)->keyLen) +#define GET_LHASH_NODE_LEN(_n) (sizeof(SLHashNode) + ((SLHashNode*)(_n))->keyLen + ((SLHashNode*)(_n))->dataLen) + +static int32_t doAddNewBucket(SLHashObj* pHashObj); + +static int32_t doGetBucketIdFromHashVal(int32_t hashv, int32_t bits) { + return hashv & ((1ul << (bits)) - 1); +} + +static int32_t doGetAlternativeBucketId(int32_t bucketId, int32_t bits, int32_t numOfBuckets) { + int32_t v = bucketId - (1ul << (bits - 1)); + + ASSERT(v < numOfBuckets); + return v; +} + +SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_t numOfTuplePerPage) { + SLHashObj* pHashObj = calloc(1, sizeof(SLHashObj)); + if (pHashObj == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + int32_t code = createDiskbasedBuf(&pHashObj->pBuf, pageSize, inMemPages * pageSize, 0, "/tmp"); + if (code != 0) { + terrno = code; + return NULL; + } + + /** + * The number of bits in the hash value, which is used to decide the exact bucket where the object should be located in. + * The initial value is 0. + */ + pHashObj->bits = 0; + pHashObj->hashFn = fn; + pHashObj->tuplesPerPage = numOfTuplePerPage; + + pHashObj->numOfAlloc = 4; // initial allocated array list + pHashObj->pBucket = calloc(pHashObj->numOfAlloc, POINTER_BYTES); + + code = doAddNewBucket(pHashObj); + if (code != TSDB_CODE_SUCCESS) { + destroyDiskbasedBuf(pHashObj->pBuf); + tfree(pHashObj); + terrno = code; + return NULL; + } + + return pHashObj; +} + +void* tHashCleanup(SLHashObj* pHashObj) { + destroyDiskbasedBuf(pHashObj->pBuf); + for(int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { + taosArrayDestroy(pHashObj->pBucket[i]->pPageIdList); + tfree(pHashObj->pBucket[i]); + } + + tfree(pHashObj->pBucket); + tfree(pHashObj); + return NULL; +} + +static void doCopyObject(char* p, const void* key, int32_t keyLen, const void* data, int32_t size) { + *(int32_t*) p = keyLen; + p += sizeof(int32_t); + *(int32_t*) p = size; + p += sizeof(int32_t); + + memcpy(p, key, keyLen); + p += keyLen; + + memcpy(p, data, size); +} + +static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t index, const void* key, int32_t keyLen, + const void* data, int32_t size) { + int32_t pageId = *(int32_t*)taosArrayGetLast(pBucket->pPageIdList); + + SFilePage* pPage = getBufPage(pHashObj->pBuf, pageId); + ASSERT (pPage != NULL); + + // put to current buf page + size_t nodeSize = sizeof(SLHashNode) + keyLen + size; + ASSERT(nodeSize <= getBufPageSize(pHashObj->pBuf)); + + if (pPage->num + nodeSize > getBufPageSize(pHashObj->pBuf)) { + releaseBufPage(pHashObj->pBuf, pPage); + + // allocate the overflow buffer page to hold this k/v. + int32_t newPageId = -1; + SFilePage* pNewPage = getNewBufPage(pHashObj->pBuf, 0, &newPageId); + if (pNewPage == 0) { + // TODO handle error + } + + taosArrayPush(pBucket->pPageIdList, &newPageId); + + doCopyObject(pNewPage->data, key, keyLen, data, size); + pNewPage->num = nodeSize; + + setBufPageDirty(pNewPage, true); + releaseBufPage(pHashObj->pBuf, pNewPage); + } else { + char* p = pPage->data + pPage->num; + doCopyObject(p, key, keyLen, data, size); + pPage->num += nodeSize; + setBufPageDirty(pPage, true); + releaseBufPage(pHashObj->pBuf, pPage); + } + + pBucket->size += 1; +// printf("===> add to bucket:0x%x, num:%d, key:%d\n", index, pBucket->size, *(int*) key); +} + +// TODO merge the fragments on multiple pages to recycle the empty disk page ASAP +static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket* pBucket) { + ASSERT(pPage != NULL && pNode != NULL); + + int32_t len = GET_LHASH_NODE_LEN(pNode); + char* p = (char*) pNode + len; + + char* pEnd = pPage->data + pPage->num; + memmove(pNode, p, (pEnd - p)); + + pPage->num -= len; + if (pPage->num == 0) { + // this page is empty, could be recycle in the future. + } + + setBufPageDirty(pPage, true); + pBucket->size -= 1; +} + +static int32_t doAddNewBucket(SLHashObj* pHashObj) { + if (pHashObj->numOfBuckets + 1 > pHashObj->numOfAlloc) { + int32_t newLen = pHashObj->numOfAlloc * 1.25; + if (newLen == pHashObj->numOfAlloc) { + newLen += 4; + } + + char* p = realloc(pHashObj->pBucket, POINTER_BYTES * newLen); + if (p == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + memset(p + POINTER_BYTES * pHashObj->numOfBuckets, 0, newLen - pHashObj->numOfBuckets); + pHashObj->pBucket = (SLHashBucket**) p; + pHashObj->numOfAlloc = newLen; + } + + SLHashBucket* pBucket = calloc(1, sizeof(SLHashBucket)); + pHashObj->pBucket[pHashObj->numOfBuckets] = pBucket; + + pBucket->pPageIdList = taosArrayInit(2, sizeof(int32_t)); + if (pBucket->pPageIdList == NULL || pBucket == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t pageId = -1; + SFilePage* p = getNewBufPage(pHashObj->pBuf, 0, &pageId); + releaseBufPage(pHashObj->pBuf, p); + + taosArrayPush(pBucket->pPageIdList, &pageId); + + pHashObj->numOfBuckets += 1; +// printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); + return TSDB_CODE_SUCCESS; +} + +int32_t tHashPut(SLHashObj* pHashObj, const void *key, size_t keyLen, void *data, size_t size) { + ASSERT(pHashObj != NULL && key != NULL); + + if (pHashObj->bits == 0) { + SLHashBucket* pBucket = pHashObj->pBucket[0]; + doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size); + } else { + int32_t hashVal = pHashObj->hashFn(key, keyLen); + int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits); + + if (pHashObj->numOfBuckets > v) { + SLHashBucket* pBucket = pHashObj->pBucket[v]; + + // TODO check return code + doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); + } else { // no matched bucket exists, find the candidate bucket + int32_t bucketId = doGetAlternativeBucketId(v, pHashObj->bits, pHashObj->numOfBuckets); +// printf("bucketId: 0x%x not exists, put it into 0x%x instead\n", v, bucketId); + + SLHashBucket* pBucket = pHashObj->pBucket[bucketId]; + doAddToBucket(pHashObj, pBucket, bucketId, key, keyLen, data, size); + } + } + + pHashObj->size += 1; + + // Too many records, needs to bucket split + if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) { + int32_t newBucketId = pHashObj->numOfBuckets; + + int32_t code = doAddNewBucket(pHashObj); + int32_t numOfBits = ceil(log(pHashObj->numOfBuckets) / log(2)); + if (numOfBits > pHashObj->bits) { +// printf("extend the bits from %d to %d, new bucket:%d\n", pHashObj->bits, numOfBits, newBucketId); + + ASSERT(numOfBits == pHashObj->bits + 1); + pHashObj->bits = numOfBits; + } + + int32_t splitBucketId = (1ul << (pHashObj->bits - 1)) ^ newBucketId; + + // load all data in this bucket and check if the data needs to relocated into the new bucket + SLHashBucket* pBucket = pHashObj->pBucket[splitBucketId]; +// printf("split %d items' bucket:0x%x to new bucket:0x%x\n", pBucket->size, splitBucketId, newBucketId); + + for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { + int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); + SFilePage* p = getBufPage(pHashObj->pBuf, pageId); + + char* pStart = p->data; + while (pStart - p->data < p->num) { + SLHashNode* pNode = (SLHashNode*)pStart; + + char* k = GET_LHASH_NODE_KEY(pNode); + int32_t hashv = pHashObj->hashFn(k, pNode->keyLen); + + int32_t v1 = hashv & ((1ul << (pHashObj->bits)) - 1); + if (v1 != splitBucketId) { // place it into the new bucket + ASSERT(v1 == newBucketId); +// printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); + + SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; + doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, + GET_LHASH_NODE_KEY(pNode), pNode->dataLen); + doRemoveFromBucket(p, pNode, pBucket); + } else { +// printf("check key:%d, located into: %d, skip it\n", *(int*) k, v1); + + int32_t nodeSize = GET_LHASH_NODE_LEN(pStart); + pStart += nodeSize; + } + } + releaseBufPage(pHashObj->pBuf, p); + } + } +} + +char* tHashGet(SLHashObj* pHashObj, const void *key, size_t keyLen) { + ASSERT(pHashObj != NULL && key != NULL && keyLen > 0); + int32_t hashv = pHashObj->hashFn(key, keyLen); + + int32_t bucketId = doGetBucketIdFromHashVal(hashv, pHashObj->bits); + if (bucketId >= pHashObj->numOfBuckets) { + bucketId = doGetAlternativeBucketId(bucketId, pHashObj->bits, pHashObj->numOfBuckets); + } + + SLHashBucket* pBucket = pHashObj->pBucket[bucketId]; + for (int32_t i = 0; i < taosArrayGetSize(pBucket->pPageIdList); ++i) { + int32_t pageId = *(int32_t*)taosArrayGet(pBucket->pPageIdList, i); + SFilePage* p = getBufPage(pHashObj->pBuf, pageId); + + char* pStart = p->data; + while (pStart - p->data < p->num) { + SLHashNode* pNode = (SLHashNode*)pStart; + + char* k = GET_LHASH_NODE_KEY(pNode); + if (pNode->keyLen == keyLen && (memcmp(key, k, keyLen) == 0)) { + releaseBufPage(pHashObj->pBuf, p); + return GET_LHASH_NODE_DATA(pNode); + } else { + pStart += GET_LHASH_NODE_LEN(pStart); + } + } + + releaseBufPage(pHashObj->pBuf, p); + } + + return NULL; +} + +int32_t tHashRemove(SLHashObj* pHashObj, const void *key, size_t keyLen) { + +} + +void tHashPrint(const SLHashObj* pHashObj, int32_t type) { + printf("==================== linear hash ====================\n"); + printf("total bucket:%d, size:%ld, ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); + + dBufSetPrintInfo(pHashObj->pBuf); + + if (type == LINEAR_HASH_DATA) { + for (int32_t i = 0; i < pHashObj->numOfBuckets; ++i) { +// printf("bucket: 0x%x, obj:%d, page:%d\n", i, pHashObj->pBucket[i]->size, +// (int)taosArrayGetSize(pHashObj->pBucket[i]->pPageIdList)); + } + } else { + dBufPrintStatis(pHashObj->pBuf); + } +} \ No newline at end of file diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 33e95a19b5..6e7b6b4659 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -147,7 +147,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { if (pHandle->pBuf == NULL) { int32_t code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); - setPrintStatis(pHandle->pBuf); + dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -162,7 +162,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { } int32_t pageId = -1; - SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId); + SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); if (pPage == NULL) { return terrno; } @@ -211,7 +211,7 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int // multi-pass internal merge sort is required if (pHandle->pBuf == NULL) { code = createDiskbasedBuf(&pHandle->pBuf, pHandle->pageSize, pHandle->numOfPages * pHandle->pageSize, 0, "/tmp"); - setPrintStatis(pHandle->pBuf); + dBufSetPrintInfo(pHandle->pBuf); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -462,7 +462,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { } int32_t pageId = -1; - SFilePage* pPage = getNewDataBuf(pHandle->pBuf, pHandle->sourceId, &pageId); + SFilePage* pPage = getNewBufPage(pHandle->pBuf, pHandle->sourceId, &pageId); if (pPage == NULL) { return terrno; } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index bf3f76b94e..c0fb899a2d 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -357,7 +357,7 @@ TEST(testCase, external_sort_Test) { taosArrayDestroy(pOrderVal); } -#endif + TEST(testCase, sorted_merge_Test) { srand(time(NULL)); @@ -430,4 +430,6 @@ TEST(testCase, sorted_merge_Test) { taosArrayDestroy(pExprInfo); taosArrayDestroy(pOrderVal); } + +#endif #pragma GCC diagnostic pop diff --git a/source/libs/executor/test/lhashTests.cpp b/source/libs/executor/test/lhashTests.cpp new file mode 100644 index 0000000000..be847d2959 --- /dev/null +++ b/source/libs/executor/test/lhashTests.cpp @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include "executorimpl.h" +#include "tlinearhash.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +TEST(testCase, linear_hash_Tests) { + srand(time(NULL)); + + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); +#if 1 + SLHashObj* pHashObj = tHashInit(100, 64 + 8, fn, 4); + for(int32_t i = 0; i < 5000; ++i) { + tHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); + } + + tHashPrint(pHashObj, LINEAR_HASH_STATIS); + + for(int32_t i = 0; i < 100; ++i) { + char* v = tHashGet(pHashObj, &i, sizeof(i)); + if (v != NULL) { +// printf("find value: %d, key:%d\n", *(int32_t*) v, i); + } else { + printf("failed to found key:%d in hash\n", i); + } + } + + tHashPrint(pHashObj, LINEAR_HASH_DATA); + tHashCleanup(pHashObj); +#endif + +#if 0 + SHashObj* pHashObj = taosHashInit(1000, fn, false, HASH_NO_LOCK); + for(int32_t i = 0; i < 500000; ++i) { + taosHashPut(pHashObj, &i, sizeof(i), &i, sizeof(i)); + } + + for(int32_t i = 0; i < 10000; ++i) { + void* v = taosHashGet(pHashObj, &i, sizeof(i)); + } + taosHashCleanup(pHashObj); +#endif + +} \ No newline at end of file diff --git a/source/libs/executor/test/executorUtilTests.cpp b/source/libs/executor/test/sortTests.cpp similarity index 100% rename from source/libs/executor/test/executorUtilTests.cpp rename to source/libs/executor/test/sortTests.cpp diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index c6ab125362..eab05ad039 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -221,7 +221,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, } pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; - pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page + pBucket->bufPageSize = 16384 * 4; // 16k per page pBucket->type = dataType; pBucket->bytes = nElemSize; @@ -347,7 +347,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = NULL; } - pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId); + pSlot->info.data = getNewBufPage(pBucket->pBuffer, groupId, &pageId); pSlot->info.pageId = pageId; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 212b3f3067..cc3d1747a1 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -18,7 +18,7 @@ typedef struct SPageDiskInfo { int32_t length; } SPageDiskInfo; -typedef struct SPageInfo { +struct SPageInfo { SListNode* pn; // point to list node void* pData; int64_t offset; @@ -26,9 +26,9 @@ typedef struct SPageInfo { int32_t length:30; bool used:1; // set current page is in used bool dirty:1; // set current buffer page is dirty or not -} SPageInfo; +}; -typedef struct SDiskbasedBuf { +struct SDiskbasedBuf { int32_t numOfPages; int64_t totalBufSize; uint64_t fileSize; // disk file size @@ -49,9 +49,7 @@ typedef struct SDiskbasedBuf { uint64_t qId; // for debug purpose bool printStatis; // Print statistics info when closing this buffer. SDiskbasedBufStatis statis; -} SDiskbasedBuf; - -static void printStatisData(const SDiskbasedBuf* pBuf); +}; int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { *pBuf = calloc(1, sizeof(SDiskbasedBuf)); @@ -121,7 +119,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba return data; } - *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize+sizeof(SFilePage), ONE_STAGE_COMP, NULL, 0); + *dst = tsDecompressString(data, srcSize, 1, pBuf->assistBuf, pBuf->pageSize, ONE_STAGE_COMP, NULL, 0); if (*dst > 0) { memcpy(data, pBuf->assistBuf, *dst); } @@ -151,14 +149,23 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { } } +/** + * +--------------------------+-------------------+--------------+ + * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| + * +--------------------------+-------------------+--------------+ + * @param pBuf + * @param pg + * @return + */ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { assert(!pg->used && pg->pData != NULL); int32_t size = -1; char* t = NULL; if (pg->offset == -1 || pg->dirty) { - SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); - t = doCompressData(pPage->data, pBuf->pageSize, &size, pBuf); + void* payload = GET_DATA_PAYLOAD(pg); + t = doCompressData(payload, pBuf->pageSize, &size, pBuf); + assert(size >= 0); } // this page is flushed to disk for the first time @@ -217,10 +224,14 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { pBuf->statis.flushBytes += size; pBuf->statis.flushPages += 1; + } else { + size = pg->length; } + assert(size >= 0); + char* pDataBuf = pg->pData; - memset(pDataBuf, 0, pBuf->pageSize + sizeof(SFilePage)); + memset(pDataBuf, 0, pBuf->pageSize); pg->pData = NULL; // this means the data is not in buffer pg->length = size; @@ -251,8 +262,8 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return ret; } - SFilePage* pPage = (SFilePage*) GET_DATA_PAYLOAD(pg); - ret = (int32_t)fread(pPage->data, 1, pg->length, pBuf->file); + void* pPage = (void*) GET_DATA_PAYLOAD(pg); + ret = (int32_t)fread(pPage, 1, pg->length, pBuf->file); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); return ret; @@ -262,7 +273,7 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { pBuf->statis.loadPages += 1; int32_t fullSize = 0; - doDecompressData(pPage->data, pg->length, &fullSize, pBuf); + doDecompressData(pPage, pg->length, &fullSize, pBuf); return 0; } @@ -288,7 +299,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag pBuf->numOfPages += 1; - SPageInfo* ppi = malloc(sizeof(SPageInfo));//{ .info = PAGE_INFO_INITIALIZER, .pageId = pageId, .pn = NULL}; + SPageInfo* ppi = malloc(sizeof(SPageInfo)); ppi->pageId = pageId; ppi->pData = NULL; @@ -302,6 +313,7 @@ static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t pag static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListIter iter = {0}; + tdListInitIter(pBuf->lruList, &iter, TD_LIST_BACKWARD); SListNode* pn = NULL; @@ -313,6 +325,8 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { if (!pageInfo->used) { break; + } else { + printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } @@ -360,10 +374,10 @@ static void lruListMoveToFront(SList *pList, SPageInfo* pi) { } static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { - return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage); + return pageSize + POINTER_BYTES + 2; } -SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { +void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) { pBuf->statis.getPages += 1; char* availablePage = NULL; @@ -379,6 +393,10 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) // register new id in this group *pageId = (++pBuf->allocateId); + if (*pageId == 11) { + printf("page is allocated, id:%d\n", *pageId); + } + // register page id info SPageInfo* pi = registerPage(pBuf, groupId, *pageId); @@ -404,7 +422,7 @@ SFilePage* getNewDataBuf(SDiskbasedBuf* pBuf, int32_t groupId, int32_t* pageId) return (void *)(GET_DATA_PAYLOAD(pi)); } -SFilePage* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { +void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { assert(pBuf != NULL && id >= 0); pBuf->statis.getPages += 1; @@ -493,7 +511,7 @@ void destroyDiskbasedBuf(SDiskbasedBuf* pBuf) { return; } - printStatisData(pBuf); + dBufPrintStatis(pBuf); if (pBuf->file != NULL) { uDebug("Paged buffer closed, total:%.2f Kb (%d Pages), inmem size:%.2f Kb (%d Pages), file size:%.2f Kb, page size:%.2f Kb, %"PRIx64"\n", @@ -561,7 +579,7 @@ bool isAllDataInMemBuf(const SDiskbasedBuf* pBuf) { return pBuf->fileSize == 0; } -void setBufPageDirty(SFilePage* pPage, bool dirty) { +void setBufPageDirty(void* pPage, bool dirty) { int32_t offset = offsetof(SPageInfo, pData); char* p = (char*)pPage - offset; @@ -569,7 +587,7 @@ void setBufPageDirty(SFilePage* pPage, bool dirty) { ppi->dirty = dirty; } -void setPrintStatis(SDiskbasedBuf* pBuf) { +void dBufSetPrintInfo(SDiskbasedBuf* pBuf) { pBuf->printStatis = true; } @@ -577,7 +595,7 @@ SDiskbasedBufStatis getDBufStatis(const SDiskbasedBuf* pBuf) { return pBuf->statis; } -void printStatisData(const SDiskbasedBuf* pBuf) { +void dBufPrintStatis(const SDiskbasedBuf* pBuf) { if (!pBuf->printStatis) { return; } diff --git a/source/util/test/pageBufferTest.cpp b/source/util/test/pageBufferTest.cpp index d310ff450b..e86fcf8653 100644 --- a/source/util/test/pageBufferTest.cpp +++ b/source/util/test/pageBufferTest.cpp @@ -18,7 +18,7 @@ void simpleTest() { int32_t pageId = 0; int32_t groupId = 0; - SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId); ASSERT_TRUE(pBufPage != NULL); ASSERT_EQ(getTotalBufSize(pResultBuf), 1024); @@ -29,24 +29,24 @@ void simpleTest() { releaseBufPage(pResultBuf, pBufPage); - SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t == pBufPage1); - SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t1 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t1 == pBufPage2); - SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t2 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t2 == pBufPage3); - SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t3 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t3 == pBufPage4); - SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage5 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t4 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t4 == pBufPage5); @@ -62,29 +62,29 @@ void writeDownTest() { int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId); ASSERT_TRUE(pBufPage != NULL); *(int32_t*)(pBufPage->data) = nx; writePageId = pageId; releaseBufPage(pResultBuf, pBufPage); - SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t1 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t2 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t3 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t4 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); @@ -109,32 +109,32 @@ void recyclePageTest() { int32_t groupId = 0; int32_t nx = 12345; - SFilePage* pBufPage = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage = getNewBufPage(pResultBuf, groupId, &pageId); ASSERT_TRUE(pBufPage != NULL); releaseBufPage(pResultBuf, pBufPage); - SFilePage* pBufPage1 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage1 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t1 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t1 == pBufPage1); ASSERT_TRUE(pageId == 1); - SFilePage* pBufPage2 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage2 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t2 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t2 == pBufPage2); ASSERT_TRUE(pageId == 2); - SFilePage* pBufPage3 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage3 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t3 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t3 == pBufPage3); ASSERT_TRUE(pageId == 3); - SFilePage* pBufPage4 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage4 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t4 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t4 == pBufPage4); ASSERT_TRUE(pageId == 4); releaseBufPage(pResultBuf, t4); - SFilePage* pBufPage5 = getNewDataBuf(pResultBuf, groupId, &pageId); + SFilePage* pBufPage5 = getNewBufPage(pResultBuf, groupId, &pageId); SFilePage* t5 = getBufPage(pResultBuf, pageId); ASSERT_TRUE(t5 == pBufPage5); ASSERT_TRUE(pageId == 5);