From f3c92ad39b3151a6909e0f463eab34d8a9b7efff Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 31 Aug 2020 16:40:41 +0800 Subject: [PATCH] [td-1206] --- src/client/src/tscFunctionImpl.c | 18 +- src/query/inc/qPercentile.h | 64 +- src/query/src/qPercentile.c | 1219 ++++++++++++------------------ 3 files changed, 521 insertions(+), 780 deletions(-) diff --git a/src/client/src/tscFunctionImpl.c b/src/client/src/tscFunctionImpl.c index bd4aa70ee4..7f6ce1ed0e 100644 --- a/src/client/src/tscFunctionImpl.c +++ b/src/client/src/tscFunctionImpl.c @@ -2422,24 +2422,14 @@ static void top_bottom_func_finalizer(SQLFunctionCtx *pCtx) { /////////////////////////////////////////////////////////////////////////////////////////////// static bool percentile_function_setup(SQLFunctionCtx *pCtx) { - const int32_t MAX_AVAILABLE_BUFFER_SIZE = 1 << 20; // 1MB - const int32_t NUMOFCOLS = 1; - if (!function_setup(pCtx)) { return false; } SResultInfo *pResInfo = GET_RES_INFO(pCtx); - SSchema field[1] = { { (uint8_t)pCtx->inputType, "dummyCol", 0, pCtx->inputBytes } }; - - SColumnModel *pModel = createColumnModel(field, 1, 1000); - int32_t orderIdx = 0; - - // tOrderDesc object - tOrderDescriptor *pDesc = tOrderDesCreate(&orderIdx, NUMOFCOLS, pModel, TSDB_ORDER_DESC); - + ((SPercentileInfo *)(pResInfo->interResultBuf))->pMemBucket = - tMemBucketCreate(1024, MAX_AVAILABLE_BUFFER_SIZE, pCtx->inputBytes, pCtx->inputType, pDesc); + tMemBucketCreate(pCtx->inputBytes, pCtx->inputType); return true; } @@ -2485,15 +2475,13 @@ static void percentile_finalizer(SQLFunctionCtx *pCtx) { SResultInfo *pResInfo = GET_RES_INFO(pCtx); tMemBucket * pMemBucket = ((SPercentileInfo *)pResInfo->interResultBuf)->pMemBucket; - if (pMemBucket->numOfElems > 0) { // check for null + if (pMemBucket->total > 0) { // check for null *(double *)pCtx->aOutputBuf = getPercentile(pMemBucket, v); } else { setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes); } - tOrderDescDestroy(pMemBucket->pOrderDesc); tMemBucketDestroy(pMemBucket); - doFinalizer(pCtx); } diff --git a/src/query/inc/qPercentile.h b/src/query/inc/qPercentile.h index 52f666c338..0a52d4f205 100644 --- a/src/query/inc/qPercentile.h +++ b/src/query/inc/qPercentile.h @@ -17,6 +17,8 @@ #define TDENGINE_QPERCENTILE_H #include "qExtbuffer.h" +#include "qResultbuf.h" +#include "qTsbuf.h" typedef struct MinMaxEntry { union { @@ -31,47 +33,43 @@ typedef struct MinMaxEntry { }; } MinMaxEntry; -typedef struct tMemBucketSegment { - int32_t numOfSlots; - MinMaxEntry * pBoundingEntries; - tExtMemBuffer **pBuffer; -} tMemBucketSegment; +typedef struct { + int32_t size; + int32_t pageId; + tFilePage *data; +} SSlotInfo; + +typedef struct tMemBucketSlot { + SSlotInfo info; + MinMaxEntry range; +} tMemBucketSlot; + +struct tMemBucket; +typedef int32_t (*__perc_hash_func_t)(struct tMemBucket *pBucket, const void *value); typedef struct tMemBucket { - int16_t numOfSegs; - int16_t nTotalSlots; - int16_t nSlotsOfSeg; - int16_t dataType; - - int16_t nElemSize; - int32_t numOfElems; - - int32_t nTotalBufferSize; - int32_t maxElemsCapacity; - - int32_t pageSize; - int16_t numOfTotalPages; - int16_t numOfAvailPages; /* remain available buffer pages */ - - tMemBucketSegment *pSegs; - tOrderDescriptor * pOrderDesc; - - MinMaxEntry nRange; - - void (*HashFunc)(struct tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); + int16_t numOfSlots; + int16_t type; + int16_t bytes; + int32_t total; + int32_t elemPerPage; // number of elements for each object + int32_t maxCapacity; // maximum allowed number of elements that can be sort directly to get the result + int32_t bufPageSize; // disk page size + MinMaxEntry range; // value range + int32_t times; // count that has been checked for deciding the correct data value buckets. + __compar_fn_t comparFn; + + tMemBucketSlot *pSlots; + SDiskbasedResultBuf *pBuffer; + __perc_hash_func_t hashFunc; } tMemBucket; -tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, - tOrderDescriptor *pDesc); +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType); void tMemBucketDestroy(tMemBucket *pBucket); -void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows); +void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size); double getPercentile(tMemBucket *pMemBucket, double percent); -void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); - -void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx); - #endif // TDENGINE_QPERCENTILE_H diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c index 19775075fc..c6eb836c61 100644 --- a/src/query/src/qPercentile.c +++ b/src/query/src/qPercentile.c @@ -14,310 +14,291 @@ */ #include "qPercentile.h" +#include "qResultbuf.h" #include "os.h" #include "queryLog.h" #include "taosdef.h" -#include "taosmsg.h" #include "tulog.h" +#include "tcompare.h" -tExtMemBuffer *releaseBucketsExceptFor(tMemBucket *pMemBucket, int16_t segIdx, int16_t slotIdx) { - tExtMemBuffer *pBuffer = NULL; - - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (i == segIdx && j == slotIdx) { - pBuffer = pSeg->pBuffer[j]; - } else { - if (pSeg->pBuffer && pSeg->pBuffer[j]) { - pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]); - } - } - } - } - - return pBuffer; +#define DEFAULT_NUM_OF_SLOT 1024 + +int32_t getGroupId(int32_t numOfSlots, int32_t slotIndex, int32_t times) { + return (times * numOfSlots) + slotIndex; } -static tFilePage *loadIntoBucketFromDisk(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx, - tOrderDescriptor *pDesc) { - // release all data in other slots - tExtMemBuffer *pMemBuffer = pMemBucket->pSegs[segIdx].pBuffer[slotIdx]; - tFilePage * buffer = (tFilePage *)calloc(1, pMemBuffer->nElemSize * pMemBuffer->numOfTotalElems + sizeof(tFilePage)); - int32_t oldCapacity = pDesc->pColumnModel->capacity; - pDesc->pColumnModel->capacity = pMemBuffer->numOfTotalElems; - - if (!tExtMemBufferIsAllDataInMem(pMemBuffer)) { - pMemBuffer = releaseBucketsExceptFor(pMemBucket, segIdx, slotIdx); - assert(pMemBuffer->numOfTotalElems > 0); - - // load data in disk to memory - tFilePage *pPage = (tFilePage *)calloc(1, pMemBuffer->pageSize); - - for (uint32_t i = 0; i < pMemBuffer->fileMeta.flushoutData.nLength; ++i) { - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[i]; - - int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - - for (uint32_t j = 0; j < pFlushInfo->numOfPages; ++j) { - ret = (int32_t)fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(ret); - assert(pPage->num > 0); - - tColModelAppend(pDesc->pColumnModel, buffer, pPage->data, 0, (int32_t)pPage->num, (int32_t)pPage->num); - printf("id: %d count: %" PRIu64 "\n", j, buffer->num); - } - } - taosTFree(pPage); - - assert(buffer->num == pMemBuffer->fileMeta.numOfElemsInFile); +static tFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) { + tFilePage *buffer = (tFilePage *)calloc(1, pMemBucket->bytes * pMemBucket->pSlots[slotIdx].info.size + sizeof(tFilePage)); + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, slotIdx, pMemBucket->times); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + + int32_t offset = 0; + for(int32_t i = 0; i < list->size; ++i) { + SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); + + tFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + memcpy(buffer->data + offset, pg->data, pg->num * pMemBucket->bytes); + + offset += pg->num * pMemBucket->bytes; } - - // load data in pMemBuffer to buffer - tFilePagesItem *pListItem = pMemBuffer->pHead; - while (pListItem != NULL) { - tColModelAppend(pDesc->pColumnModel, buffer, pListItem->item.data, 0, (int32_t)pListItem->item.num, - (int32_t)pListItem->item.num); - pListItem = pListItem->pNext; - } - - tColDataQSort(pDesc, (int32_t)buffer->num, 0, (int32_t)buffer->num - 1, buffer->data, TSDB_ORDER_ASC); - - pDesc->pColumnModel->capacity = oldCapacity; // restore value + + qsort(buffer->data, pMemBucket->pSlots[slotIdx].info.size, pMemBucket->bytes, pMemBucket->comparFn); return buffer; } -double findOnlyResult(tMemBucket *pMemBucket) { - assert(pMemBucket->numOfElems == 1); - - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - if (pSeg->pBuffer) { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - tExtMemBuffer *pBuffer = pSeg->pBuffer[j]; - if (pBuffer) { - assert(pBuffer->numOfTotalElems == 1); - tFilePage *pPage = &pBuffer->pHead->item; - if (pBuffer->numOfElemsInBuffer == 1) { - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_INT: - return *(int32_t *)pPage->data; - case TSDB_DATA_TYPE_SMALLINT: - return *(int16_t *)pPage->data; - case TSDB_DATA_TYPE_TINYINT: - return *(int8_t *)pPage->data; - case TSDB_DATA_TYPE_BIGINT: - return (double)(*(int64_t *)pPage->data); - case TSDB_DATA_TYPE_DOUBLE: { - double dv = GET_DOUBLE_VAL(pPage->data); - //return *(double *)pPage->data; - return dv; - } - case TSDB_DATA_TYPE_FLOAT: { - float fv = GET_FLOAT_VAL(pPage->data); - //return *(float *)pPage->data; - return fv; - } - default: - return 0; - } - } - } - } - } - } - return 0; -} - -void tBucketBigIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - int64_t v = *(int64_t *)value; - - if (pBucket->nRange.i64MaxVal == INT64_MIN) { - if (v >= 0) { - *segIdx = ((v >> (64 - 9)) >> 6) + 8; - *slotIdx = (v >> (64 - 9)) & 0x3F; - } else { // v<0 - *segIdx = ((-v) >> (64 - 9)) >> 6; - *slotIdx = ((-v) >> (64 - 9)) & 0x3F; - *segIdx = 7 - (*segIdx); - } - } else { - // todo hash for bigint and float and double - int64_t span = pBucket->nRange.i64MaxVal - pBucket->nRange.i64MinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = (int32_t)(v - pBucket->nRange.i64MinVal); - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = (double)span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.i64MinVal) / x; - if (v == pBucket->nRange.i64MaxVal) { - posx -= 1; - } - - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - } -} - -// todo refactor to more generic -void tBucketIntHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - int32_t v = *(int32_t *)value; - - if (pBucket->nRange.iMaxVal == INT32_MIN) { - /* - * taking negative integer into consideration, - * there is only half of pBucket->segs available for non-negative integer - */ - // int32_t numOfSlots = pBucket->nTotalSlots>>1; - // int32_t bits = bitsOfNumber(numOfSlots)-1; - - if (v >= 0) { - *segIdx = ((v >> (32 - 9)) >> 6) + 8; - *slotIdx = (v >> (32 - 9)) & 0x3F; - } else { // v<0 - *segIdx = ((-v) >> (32 - 9)) >> 6; - *slotIdx = ((-v) >> (32 - 9)) & 0x3F; - *segIdx = 7 - (*segIdx); - } - } else { - // divide a range of [iMinVal, iMaxVal] into 1024 buckets - int32_t span = pBucket->nRange.iMaxVal - pBucket->nRange.iMinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = v - pBucket->nRange.iMinVal; - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = (double)span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.iMinVal) / x; - if (v == pBucket->nRange.iMaxVal) { - posx -= 1; - } - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - } -} - -void tBucketDoubleHash(tMemBucket *pBucket, void *value, int16_t *segIdx, int16_t *slotIdx) { - // double v = *(double *)value; - double v = GET_DOUBLE_VAL(value); - - if (pBucket->nRange.dMinVal == DBL_MAX) { - /* - * taking negative integer into consideration, - * there is only half of pBucket->segs available for non-negative integer - */ - double x = DBL_MAX / (pBucket->nTotalSlots >> 1); - double posx = (v + DBL_MAX) / x; - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } else { - // divide a range of [dMinVal, dMaxVal] into 1024 buckets - double span = pBucket->nRange.dMaxVal - pBucket->nRange.dMinVal; - if (span < pBucket->nTotalSlots) { - int32_t delta = (int32_t)(v - pBucket->nRange.dMinVal); - *segIdx = delta / pBucket->nSlotsOfSeg; - *slotIdx = delta % pBucket->nSlotsOfSeg; - } else { - double x = span / pBucket->nTotalSlots; - double posx = (v - pBucket->nRange.dMinVal) / x; - if (v == pBucket->nRange.dMaxVal) { - posx -= 1; - } - *segIdx = ((int32_t)posx) / pBucket->nSlotsOfSeg; - *slotIdx = ((int32_t)posx) % pBucket->nSlotsOfSeg; - } - - if (*segIdx < 0 || *segIdx > 16 || *slotIdx < 0 || *slotIdx > 64) { - uError("error in hash process. segment is: %d, slot id is: %d\n", *segIdx, *slotIdx); - } - } -} - -tMemBucket *tMemBucketCreate(int32_t totalSlots, int32_t nBufferSize, int16_t nElemSize, int16_t dataType, - tOrderDescriptor *pDesc) { - tMemBucket *pBucket = (tMemBucket *)malloc(sizeof(tMemBucket)); - pBucket->nTotalSlots = totalSlots; - pBucket->nSlotsOfSeg = 1 << 6; // 64 Segments, 16 slots each seg. - pBucket->dataType = dataType; - pBucket->nElemSize = nElemSize; - pBucket->pageSize = DEFAULT_PAGE_SIZE; - - pBucket->numOfElems = 0; - pBucket->numOfSegs = pBucket->nTotalSlots / pBucket->nSlotsOfSeg; - - pBucket->nTotalBufferSize = nBufferSize; - - pBucket->maxElemsCapacity = pBucket->nTotalBufferSize / pBucket->nElemSize; - - pBucket->numOfTotalPages = pBucket->nTotalBufferSize / pBucket->pageSize; - pBucket->numOfAvailPages = pBucket->numOfTotalPages; - - pBucket->pSegs = NULL; - pBucket->pOrderDesc = pDesc; - - switch (pBucket->dataType) { +static void resetBoundingBox(MinMaxEntry* range, int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_BIGINT: { + range->i64MaxVal = INT64_MIN; + range->i64MinVal = INT64_MAX; + break; + }; case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: { - pBucket->nRange.iMinVal = INT32_MAX; - pBucket->nRange.iMaxVal = INT32_MIN; - pBucket->HashFunc = tBucketIntHash; + range->iMaxVal = INT32_MIN; + range->iMinVal = INT32_MAX; break; }; case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_FLOAT: { - pBucket->nRange.dMinVal = DBL_MAX; - pBucket->nRange.dMaxVal = -DBL_MAX; - pBucket->HashFunc = tBucketDoubleHash; + range->dMaxVal = -DBL_MAX; + range->dMinVal = DBL_MAX; break; - }; - case TSDB_DATA_TYPE_BIGINT: { - pBucket->nRange.i64MinVal = INT64_MAX; - pBucket->nRange.i64MaxVal = INT64_MIN; - pBucket->HashFunc = tBucketBigIntHash; - break; - }; - default: { - uError("MemBucket:%p,not support data type %d,failed", pBucket, pBucket->dataType); - taosTFree(pBucket); - return NULL; + } + } +} + +static void resetPosInfo(SSlotInfo* pInfo) { + pInfo->size = 0; + pInfo->pageId = -1; + pInfo->data = NULL; +} + +double findOnlyResult(tMemBucket *pMemBucket) { + assert(pMemBucket->total == 1); + + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { + continue; + } + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + assert(list->size == 1); + + SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0); + tFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + assert(pPage->num == 1); + + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_INT: + return *(int32_t *)pPage->data; + case TSDB_DATA_TYPE_SMALLINT: + return *(int16_t *)pPage->data; + case TSDB_DATA_TYPE_TINYINT: + return *(int8_t *)pPage->data; + case TSDB_DATA_TYPE_BIGINT: + return (double)(*(int64_t *)pPage->data); + case TSDB_DATA_TYPE_DOUBLE: { + double dv = GET_DOUBLE_VAL(pPage->data); + return dv; + } + case TSDB_DATA_TYPE_FLOAT: { + float fv = GET_FLOAT_VAL(pPage->data); + return fv; + } + default: + return 0; } } - int32_t numOfCols = pDesc->pColumnModel->numOfCols; - if (numOfCols != 1) { - uError("MemBucket:%p,only consecutive data is allowed,invalid numOfCols:%d", pBucket, numOfCols); - taosTFree(pBucket); + return 0; +} + +int32_t tBucketBigIntHash(tMemBucket *pBucket, const void *value) { + int64_t v = *(int64_t *)value; + int32_t index = -1; + + int32_t halfSlot = pBucket->numOfSlots >> 1; +// int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1; + + if (pBucket->range.i64MaxVal == INT64_MIN) { + if (v >= 0) { + index = (v >> (64 - 9)) + halfSlot; + } else { // v<0 + index = ((-v) >> (64 - 9)); + index = -index + (halfSlot - 1); + } + + return index; + } else { + // todo hash for bigint and float and double + int64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = (int32_t)(v - pBucket->range.i64MinVal); + index = delta % pBucket->numOfSlots; + } else { + double slotSpan = (double)span / pBucket->numOfSlots; + index = (v - pBucket->range.i64MinVal) / slotSpan; + if (v == pBucket->range.i64MaxVal) { + index -= 1; + } + } + + return index; + } +} + +// todo refactor to more generic +int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) { + int32_t v = *(int32_t *)value; + int32_t index = -1; + + if (pBucket->range.iMaxVal == INT32_MIN) { + /* + * taking negative integer into consideration, + * there is only half of pBucket->segs available for non-negative integer + */ + int32_t halfSlot = pBucket->numOfSlots >> 1; + int32_t bits = 32;//bitsOfNumber(pBucket->numOfSlots) - 1; + + if (v >= 0) { + index = (v >> (bits - 9)) + halfSlot; + } else { // v < 0 + index = ((-v) >> (32 - 9)); + index = -index + (halfSlot - 1); + } + + return index; + } else { + // divide a range of [iMinVal, iMaxVal] into 1024 buckets + int32_t span = pBucket->range.iMaxVal - pBucket->range.iMinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = v - pBucket->range.iMinVal; + index = (delta % pBucket->numOfSlots); + } else { + double slotSpan = (double)span / pBucket->numOfSlots; + index = (v - pBucket->range.iMinVal) / slotSpan; + if (v == pBucket->range.iMaxVal) { + index -= 1; + } + } + + return index; + } +} + +int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) { + double v = GET_DOUBLE_VAL(value); + int32_t index = -1; + + if (pBucket->range.dMinVal == DBL_MAX) { + /* + * taking negative integer into consideration, + * there is only half of pBucket->segs available for non-negative integer + */ + double x = DBL_MAX / (pBucket->numOfSlots >> 1); + double posx = (v + DBL_MAX) / x; + return ((int32_t)posx) % pBucket->numOfSlots; + } else { + // divide a range of [dMinVal, dMaxVal] into 1024 buckets + double span = pBucket->range.dMaxVal - pBucket->range.dMinVal; + if (span < pBucket->numOfSlots) { + int32_t delta = (int32_t)(v - pBucket->range.dMinVal); + index = (delta % pBucket->numOfSlots); + } else { + double slotSpan = span / pBucket->numOfSlots; + index = (v - pBucket->range.dMinVal) / slotSpan; + if (v == pBucket->range.dMaxVal) { + index -= 1; + } + } + + if (index < 0 || index > pBucket->numOfSlots) { + uError("error in hash process. slot id: %d", index); + } + + return index; + } +} + +static __perc_hash_func_t getHashFunc(int32_t type) { + switch (type) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: { + return tBucketIntHash; + }; + + case TSDB_DATA_TYPE_DOUBLE: + case TSDB_DATA_TYPE_FLOAT: { + return tBucketDoubleHash; + }; + + case TSDB_DATA_TYPE_BIGINT: { + return tBucketBigIntHash; + }; + + default: { + return NULL; + } + } +} + +static void resetSlotInfo(tMemBucket* pBucket) { + for (int32_t i = 0; i < pBucket->numOfSlots; ++i) { + tMemBucketSlot* pSlot = &pBucket->pSlots[i]; + + resetBoundingBox(&pSlot->range, pBucket->type); + resetPosInfo(&pSlot->info); + } +} + +tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType) { + tMemBucket *pBucket = (tMemBucket *)calloc(1, sizeof(tMemBucket)); + if (pBucket == NULL) { return NULL; } - SSchema* pSchema = getColumnModelSchema(pDesc->pColumnModel, 0); - if (pSchema->type != dataType) { - uError("MemBucket:%p,data type is not consistent,%d in schema, %d in param", pBucket, pSchema->type, dataType); - taosTFree(pBucket); + pBucket->numOfSlots = DEFAULT_NUM_OF_SLOT; + pBucket->bufPageSize = DEFAULT_PAGE_SIZE * 4; // 4k per page + + pBucket->type = dataType; + pBucket->bytes = nElemSize; + pBucket->total = 0; + pBucket->times = 1; + + pBucket->maxCapacity = 200000; + + pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes; + pBucket->comparFn = getKeyComparFunc(pBucket->type); + resetBoundingBox(&pBucket->range, pBucket->type); + + pBucket->hashFunc = getHashFunc(pBucket->type); + if (pBucket->hashFunc == NULL) { + uError("MemBucket:%p, not support data type %d, failed", pBucket, pBucket->type); + free(pBucket); return NULL; } - if (pBucket->numOfTotalPages < pBucket->nTotalSlots) { - uWarn("MemBucket:%p,total buffer pages %d are not enough for all slots", pBucket, pBucket->numOfTotalPages); + pBucket->pSlots = (tMemBucketSlot *)calloc(pBucket->numOfSlots, sizeof(tMemBucketSlot)); + if (pBucket->pSlots == NULL) { + free(pBucket); + return NULL; } - pBucket->pSegs = (tMemBucketSegment *)malloc(pBucket->numOfSegs * sizeof(tMemBucketSegment)); + resetSlotInfo(pBucket); - for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { - pBucket->pSegs[i].numOfSlots = pBucket->nSlotsOfSeg; - pBucket->pSegs[i].pBuffer = NULL; - pBucket->pSegs[i].pBoundingEntries = NULL; + int32_t ret = createDiskbasedResultBuffer(&pBucket->pBuffer, pBucket->bytes, pBucket->bufPageSize, pBucket->bufPageSize * 512, NULL); + if (ret != TSDB_CODE_SUCCESS) { + tMemBucketDestroy(pBucket); + return NULL; } - - uDebug("MemBucket:%p,created,buffer size:%ld,elem size:%d", pBucket, pBucket->numOfTotalPages * DEFAULT_PAGE_SIZE, - pBucket->nElemSize); - + + uDebug("MemBucket:%p, elem size:%d", pBucket, pBucket->bytes); return pBucket; } @@ -326,81 +307,11 @@ void tMemBucketDestroy(tMemBucket *pBucket) { return; } - if (pBucket->pSegs) { - for (int32_t i = 0; i < pBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &(pBucket->pSegs[i]); - taosTFree(pSeg->pBoundingEntries); - - if (pSeg->pBuffer == NULL || pSeg->numOfSlots == 0) { - continue; - } - - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (pSeg->pBuffer[j] != NULL) { - pSeg->pBuffer[j] = destoryExtMemBuffer(pSeg->pBuffer[j]); - } - } - taosTFree(pSeg->pBuffer); - } - } - - taosTFree(pBucket->pSegs); + destroyResultBuf(pBucket->pBuffer); + taosTFree(pBucket->pSlots); taosTFree(pBucket); } -/* - * find the slots which accounts for largest proportion of total in-memory buffer - */ -static void tBucketGetMaxMemSlot(tMemBucket *pBucket, int16_t *segIdx, int16_t *slotIdx) { - *segIdx = -1; - *slotIdx = -1; - - int32_t val = 0; - for (int32_t k = 0; k < pBucket->numOfSegs; ++k) { - tMemBucketSegment *pSeg = &pBucket->pSegs[k]; - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - if (pSeg->pBuffer == NULL || pSeg->pBuffer[i] == NULL) { - continue; - } - - if (val < pSeg->pBuffer[i]->numOfInMemPages) { - val = pSeg->pBuffer[i]->numOfInMemPages; - *segIdx = k; - *slotIdx = i; - } - } - } -} - -static void resetBoundingBox(tMemBucketSegment *pSeg, int32_t type) { - switch (type) { - case TSDB_DATA_TYPE_BIGINT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].i64MaxVal = INT64_MIN; - pSeg->pBoundingEntries[i].i64MinVal = INT64_MAX; - } - break; - }; - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].iMaxVal = INT32_MIN; - pSeg->pBoundingEntries[i].iMinVal = INT32_MAX; - } - break; - }; - case TSDB_DATA_TYPE_DOUBLE: - case TSDB_DATA_TYPE_FLOAT: { - for (int32_t i = 0; i < pSeg->numOfSlots; ++i) { - pSeg->pBoundingEntries[i].dMaxVal = -DBL_MAX; - pSeg->pBoundingEntries[i].dMinVal = DBL_MAX; - } - break; - } - } -} - void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { switch (dataType) { case TSDB_DATA_TYPE_INT: { @@ -461,7 +372,6 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { break; }; case TSDB_DATA_TYPE_FLOAT: { - // double val = *(float *)data; double val = GET_FLOAT_VAL(data); if (r->dMinVal > val) { @@ -478,171 +388,95 @@ void tMemBucketUpdateBoundingBox(MinMaxEntry *r, char *data, int32_t dataType) { } /* - * in memory bucket, we only accept the simple data consecutive put in a row/column - * no column-model in this case. + * in memory bucket, we only accept data array list */ -void tMemBucketPut(tMemBucket *pBucket, void *data, int32_t numOfRows) { - pBucket->numOfElems += numOfRows; - int16_t segIdx = 0, slotIdx = 0; +void tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { + assert(pBucket != NULL && data != NULL && size > 0); + pBucket->total += size; - for (int32_t i = 0; i < numOfRows; ++i) { - char *d = (char *)data + i * tDataTypeDesc[pBucket->dataType].nSize; + int32_t bytes = pBucket->bytes; - switch (pBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - int32_t val = *(int16_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_TINYINT: { - int32_t val = *(int8_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_INT: { - int32_t val = *(int32_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - int64_t val = *(int64_t *)d; - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // double val = *(double *)d; - double val = GET_DOUBLE_VAL(d); - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - case TSDB_DATA_TYPE_FLOAT: { - // double val = *(float *)d; - double val = GET_FLOAT_VAL(d); - (pBucket->HashFunc)(pBucket, &val, &segIdx, &slotIdx); - break; - } - } + for (int32_t i = 0; i < size; ++i) { + char *d = (char *) data + i * bytes; - tMemBucketSegment *pSeg = &pBucket->pSegs[segIdx]; - if (pSeg->pBoundingEntries == NULL) { - pSeg->pBoundingEntries = (MinMaxEntry *)malloc(sizeof(MinMaxEntry) * pBucket->nSlotsOfSeg); - resetBoundingBox(pSeg, pBucket->dataType); - } + int32_t slotIdx = (pBucket->hashFunc)(pBucket, d); + assert(slotIdx >= 0); - if (pSeg->pBuffer == NULL) { - pSeg->pBuffer = (tExtMemBuffer **)calloc(pBucket->nSlotsOfSeg, sizeof(void *)); - } - - if (pSeg->pBuffer[slotIdx] == NULL) { - pSeg->pBuffer[slotIdx] = createExtMemBuffer(pBucket->numOfTotalPages * pBucket->pageSize, pBucket->nElemSize, - pBucket->pageSize, pBucket->pOrderDesc->pColumnModel); - pSeg->pBuffer[slotIdx]->flushModel = SINGLE_APPEND_MODEL; - pBucket->pOrderDesc->pColumnModel->capacity = pSeg->pBuffer[slotIdx]->numOfElemsPerPage; - } - - tMemBucketUpdateBoundingBox(&pSeg->pBoundingEntries[slotIdx], d, pBucket->dataType); + tMemBucketSlot *pSlot = &pBucket->pSlots[slotIdx]; + tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type); // ensure available memory pages to allocate - int16_t cseg = 0, cslot = 0; - if (pBucket->numOfAvailPages == 0) { - uDebug("MemBucket:%p,max avail size:%d, no avail memory pages,", pBucket, pBucket->numOfTotalPages); + int32_t groupId = getGroupId(pBucket->numOfSlots, slotIdx, pBucket->times); + int32_t pageId = -1; - tBucketGetMaxMemSlot(pBucket, &cseg, &cslot); - if (cseg == -1 || cslot == -1) { - uError("MemBucket:%p,failed to find appropriated avail buffer", pBucket); - return; + if (pSlot->info.data == NULL || pSlot->info.data->num >= pBucket->elemPerPage) { + if (pSlot->info.data != NULL) { + assert(pSlot->info.data->num >= pBucket->elemPerPage && pSlot->info.size > 0); + + // keep the pointer in memory + releaseResBufPage(pBucket->pBuffer, pSlot->info.data); + pSlot->info.data = NULL; } - if (cseg != segIdx || cslot != slotIdx) { - pBucket->numOfAvailPages += pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; - - int32_t avail = pBucket->pSegs[cseg].pBuffer[cslot]->numOfInMemPages; - UNUSED(avail); - tExtMemBufferFlush(pBucket->pSegs[cseg].pBuffer[cslot]); - - uDebug("MemBucket:%p,seg:%d,slot:%d flushed to disk,new avail pages:%d", pBucket, cseg, cslot, - pBucket->numOfAvailPages); - } else { - uDebug("MemBucket:%p,failed to choose slot to flush to disk seg:%d,slot:%d", pBucket, cseg, cslot); - } + pSlot->info.data = getNewDataBuf(pBucket->pBuffer, groupId, &pageId); + pSlot->info.pageId = pageId; } - int16_t consumedPgs = pSeg->pBuffer[slotIdx]->numOfInMemPages; - int16_t newPgs = tExtMemBufferPut(pSeg->pBuffer[slotIdx], d, 1); - /* - * trigger 1. page re-allocation, to reduce the available pages - * 2. page flushout, to increase the available pages - */ - pBucket->numOfAvailPages += (consumedPgs - newPgs); + memcpy(pSlot->info.data->data + pSlot->info.data->num * pBucket->bytes, d, pBucket->bytes); + + pSlot->info.data->num += 1; + pSlot->info.size += 1; } } -void releaseBucket(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - if (segIdx < 0 || segIdx > pMemBucket->numOfSegs || slotIdx < 0) { - return; - } - - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; - if (slotIdx < 0 || slotIdx >= pSeg->numOfSlots || pSeg->pBuffer[slotIdx] == NULL) { - return; - } - - pSeg->pBuffer[slotIdx] = destoryExtMemBuffer(pSeg->pBuffer[slotIdx]); -} - //////////////////////////////////////////////////////////////////////////////////////////// static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minVal) { *minVal = DBL_MAX; *maxVal = -DBL_MAX; - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - if (pSeg->pBuffer == NULL) { + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { continue; } - switch (pMemBucket->dataType) { + + switch (pMemBucket->type) { case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_TINYINT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = pSeg->pBoundingEntries[j].iMinVal; - double maxv = pSeg->pBoundingEntries[j].iMaxVal; + double minv = pSlot->range.iMinVal; + double maxv = pSlot->range.iMaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } case TSDB_DATA_TYPE_DOUBLE: case TSDB_DATA_TYPE_FLOAT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = pSeg->pBoundingEntries[j].dMinVal; - double maxv = pSeg->pBoundingEntries[j].dMaxVal; + double minv = pSlot->range.dMinVal; + double maxv = pSlot->range.dMaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } case TSDB_DATA_TYPE_BIGINT: { - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - double minv = (double)pSeg->pBoundingEntries[j].i64MinVal; - double maxv = (double)pSeg->pBoundingEntries[j].i64MaxVal; + double minv = (double)pSlot->range.i64MinVal; + double maxv = (double)pSlot->range.i64MaxVal; - if (*minVal > minv) { - *minVal = minv; - } - if (*maxVal < maxv) { - *maxVal = maxv; - } + if (*minVal > minv) { + *minVal = minv; + } + if (*maxVal < maxv) { + *maxVal = maxv; } break; } @@ -650,20 +484,6 @@ static void findMaxMinValue(tMemBucket *pMemBucket, double *maxVal, double *minV } } -static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBucket, int32_t segIdx) { - int32_t i = segIdx + 1; - while (i < pMemBucket->numOfSegs && pMemBucket->pSegs[i].numOfSlots == 0) ++i; - - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - assert(pMemBucket->numOfSegs > i && pMemBucket->pSegs[i].pBuffer != NULL); - - i = 0; - while (i < pMemBucket->nSlotsOfSeg && pSeg->pBuffer[i] == NULL) ++i; - - assert(i < pMemBucket->nSlotsOfSeg); - return pSeg->pBoundingEntries[i]; -} - /* * * now, we need to find the minimum value of the next slot for @@ -671,262 +491,198 @@ static MinMaxEntry getMinMaxEntryOfNearestSlotInNextSegment(tMemBucket *pMemBuck * j is the last slot of current segment, we need to get the first * slot of the next segment. */ -static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; - - MinMaxEntry next; - if (slotIdx == pSeg->numOfSlots - 1) { // find next segment with data - return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); - } else { +static MinMaxEntry getMinMaxEntryOfNextSlotWithData(tMemBucket *pMemBucket, int32_t slotIdx) { int32_t j = slotIdx + 1; - for (; j < pMemBucket->nSlotsOfSeg && pMemBucket->pSegs[segIdx].pBuffer[j] == 0; ++j) { + while (j < pMemBucket->numOfSlots && (pMemBucket->pSlots[j].info.size == 0)) { + ++j; + } + + assert(j < pMemBucket->numOfSlots); + return pMemBucket->pSlots[j].range; +} + +static bool isIdenticalData(tMemBucket *pMemBucket, int32_t index); +char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage); + +static double getIdenticalDataVal(tMemBucket* pMemBucket, int32_t slotIndex) { + assert(isIdenticalData(pMemBucket, slotIndex)); + + tMemBucketSlot *pSlot = &pMemBucket->pSlots[slotIndex]; + + double finalResult = 0.0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: + case TSDB_DATA_TYPE_INT: { + finalResult = pSlot->range.iMinVal; + break; + } + + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + finalResult = pSlot->range.dMinVal; + break; }; - if (j == pMemBucket->nSlotsOfSeg) { // current slot has no available - // slot,try next segment - return getMinMaxEntryOfNearestSlotInNextSegment(pMemBucket, segIdx); - } else { - next = pSeg->pBoundingEntries[slotIdx + 1]; - assert(pSeg->pBuffer[slotIdx + 1] != NULL); + case TSDB_DATA_TYPE_BIGINT: { + finalResult = pSlot->range.i64MinVal; + break; } } - return next; + return finalResult; } -bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx); -char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage); - double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) { int32_t num = 0; - for (int32_t i = 0; i < pMemBucket->numOfSegs; ++i) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[i]; - for (int32_t j = 0; j < pSeg->numOfSlots; ++j) { - if (pSeg->pBuffer == NULL || pSeg->pBuffer[j] == NULL) { - continue; - } - // required value in current slot - if (num < (count + 1) && num + pSeg->pBuffer[j]->numOfTotalElems >= (count + 1)) { - if (pSeg->pBuffer[j]->numOfTotalElems + num == (count + 1)) { - /* - * now, we need to find the minimum value of the next slot for interpolating the percentile value - * j is the last slot of current segment, we need to get the first slot of the next segment. - * - */ - MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i, j); + for (int32_t i = 0; i < pMemBucket->numOfSlots; ++i) { + tMemBucketSlot *pSlot = &pMemBucket->pSlots[i]; + if (pSlot->info.size == 0) { + continue; + } - double maxOfThisSlot = 0; - double minOfNextSlot = 0; - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_TINYINT: { - maxOfThisSlot = pSeg->pBoundingEntries[j].iMaxVal; - minOfNextSlot = next.iMinVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: - case TSDB_DATA_TYPE_DOUBLE: { - maxOfThisSlot = pSeg->pBoundingEntries[j].dMaxVal; - minOfNextSlot = next.dMinVal; - break; - }; - case TSDB_DATA_TYPE_BIGINT: { - maxOfThisSlot = (double)pSeg->pBoundingEntries[j].i64MaxVal; - minOfNextSlot = (double)next.i64MinVal; - break; - } + // required value in current slot + if (num < (count + 1) && num + pSlot->info.size >= (count + 1)) { + if (pSlot->info.size + num == (count + 1)) { + /* + * now, we need to find the minimum value of the next slot for interpolating the percentile value + * j is the last slot of current segment, we need to get the first slot of the next segment. + */ + MinMaxEntry next = getMinMaxEntryOfNextSlotWithData(pMemBucket, i); + + double maxOfThisSlot = 0; + double minOfNextSlot = 0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_INT: + case TSDB_DATA_TYPE_SMALLINT: + case TSDB_DATA_TYPE_TINYINT: { + maxOfThisSlot = pSlot->range.iMaxVal; + minOfNextSlot = next.iMinVal; + break; }; - - assert(minOfNextSlot > maxOfThisSlot); - - double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; - return val; - } - if (pSeg->pBuffer[j]->numOfTotalElems <= pMemBucket->maxElemsCapacity) { - // data in buffer and file are merged together to be processed. - tFilePage *buffer = loadIntoBucketFromDisk(pMemBucket, i, j, pMemBucket->pOrderDesc); - int32_t currentIdx = count - num; - - char * thisVal = buffer->data + pMemBucket->nElemSize * currentIdx; - char * nextVal = thisVal + pMemBucket->nElemSize; - double td = 1.0, nd = 1.0; - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - td = *(int16_t *)thisVal; - nd = *(int16_t *)nextVal; - break; - } - case TSDB_DATA_TYPE_TINYINT: { - td = *(int8_t *)thisVal; - nd = *(int8_t *)nextVal; - break; - } - case TSDB_DATA_TYPE_INT: { - td = *(int32_t *)thisVal; - nd = *(int32_t *)nextVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: { - // td = *(float *)thisVal; - // nd = *(float *)nextVal; - td = GET_FLOAT_VAL(thisVal); - nd = GET_FLOAT_VAL(nextVal); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // td = *(double *)thisVal; - td = GET_DOUBLE_VAL(thisVal); - // nd = *(double *)nextVal; - nd = GET_DOUBLE_VAL(nextVal); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - td = (double)*(int64_t *)thisVal; - nd = (double)*(int64_t *)nextVal; - break; - } + case TSDB_DATA_TYPE_FLOAT: + case TSDB_DATA_TYPE_DOUBLE: { + maxOfThisSlot = pSlot->range.dMaxVal; + minOfNextSlot = next.dMinVal; + break; + }; + case TSDB_DATA_TYPE_BIGINT: { + maxOfThisSlot = (double)pSlot->range.i64MaxVal; + minOfNextSlot = (double)next.i64MinVal; + break; } - double val = (1 - fraction) * td + fraction * nd; - taosTFree(buffer); + }; - return val; - } else { // incur a second round bucket split - if (isIdenticalData(pMemBucket, i, j)) { - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; + assert(minOfNextSlot > maxOfThisSlot); - tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); - - char *thisVal = getFirstElemOfMemBuffer(pSeg, j, pPage); - - double finalResult = 0.0; - - switch (pMemBucket->dataType) { - case TSDB_DATA_TYPE_SMALLINT: { - finalResult = *(int16_t *)thisVal; - break; - } - case TSDB_DATA_TYPE_TINYINT: { - finalResult = *(int8_t *)thisVal; - break; - } - case TSDB_DATA_TYPE_INT: { - finalResult = *(int32_t *)thisVal; - break; - }; - case TSDB_DATA_TYPE_FLOAT: { - // finalResult = *(float *)thisVal; - finalResult = GET_FLOAT_VAL(thisVal); - break; - } - case TSDB_DATA_TYPE_DOUBLE: { - // finalResult = *(double *)thisVal; - finalResult = GET_DOUBLE_VAL(thisVal); - break; - } - case TSDB_DATA_TYPE_BIGINT: { - finalResult = (double)(*(int64_t *)thisVal); - break; - } - } - - free(pPage); - return finalResult; - } - - uDebug("MemBucket:%p,start second round bucketing", pMemBucket); - - if (pSeg->pBuffer[j]->numOfElemsInBuffer != 0) { - uDebug("MemBucket:%p,flush %d pages to disk, clear status", pMemBucket, pSeg->pBuffer[j]->numOfInMemPages); - - pMemBucket->numOfAvailPages += pSeg->pBuffer[j]->numOfInMemPages; - tExtMemBufferFlush(pSeg->pBuffer[j]); - } - - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[j]; - pSeg->pBuffer[j] = NULL; - - // release all - for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; - for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { - if (pSeg->pBuffer && pSeg->pBuffer[ttx]) { - pSeg->pBuffer[ttx] = destoryExtMemBuffer(pSeg->pBuffer[ttx]); - } - } - } - - pMemBucket->nRange.i64MaxVal = pSeg->pBoundingEntries->i64MaxVal; - pMemBucket->nRange.i64MinVal = pSeg->pBoundingEntries->i64MinVal; - pMemBucket->numOfElems = 0; - - for (int32_t tt = 0; tt < pMemBucket->numOfSegs; ++tt) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[tt]; - for (int32_t ttx = 0; ttx < pSeg->numOfSlots; ++ttx) { - if (pSeg->pBoundingEntries) { - resetBoundingBox(pSeg, pMemBucket->dataType); - } - } - } - - tFilePage *pPage = (tFilePage *)malloc(pMemBuffer->pageSize); - - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; - assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); - - int32_t ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - - for (uint32_t jx = 0; jx < pFlushInfo->numOfPages; ++jx) { - size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - if (sz != pMemBuffer->pageSize) { - uError("MemBucket:%p, read tmp file %s failed", pMemBucket, pMemBuffer->path); - } else { - tMemBucketPut(pMemBucket, pPage->data, (int32_t)pPage->num); - } - } - - fclose(pMemBuffer->file); - if (unlink(pMemBuffer->path) != 0) { - uError("MemBucket:%p, remove tmp file %s failed", pMemBucket, pMemBuffer->path); - } - taosTFree(pMemBuffer); - taosTFree(pPage); - - return getPercentileImpl(pMemBucket, count - num, fraction); - } - } else { - num += pSeg->pBuffer[j]->numOfTotalElems; + double val = (1 - fraction) * maxOfThisSlot + fraction * minOfNextSlot; + return val; } + + if (pSlot->info.size <= pMemBucket->maxCapacity) { + // data in buffer and file are merged together to be processed. + tFilePage *buffer = loadDataFromFilePage(pMemBucket, i); + int32_t currentIdx = count - num; + + char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; + char *nextVal = thisVal + pMemBucket->bytes; + + double td = 1.0, nd = 1.0; + switch (pMemBucket->type) { + case TSDB_DATA_TYPE_SMALLINT: { + td = *(int16_t *)thisVal; + nd = *(int16_t *)nextVal; + break; + } + case TSDB_DATA_TYPE_TINYINT: { + td = *(int8_t *)thisVal; + nd = *(int8_t *)nextVal; + break; + } + case TSDB_DATA_TYPE_INT: { + td = *(int32_t *)thisVal; + nd = *(int32_t *)nextVal; + break; + }; + case TSDB_DATA_TYPE_FLOAT: { + td = GET_FLOAT_VAL(thisVal); + nd = GET_FLOAT_VAL(nextVal); + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + td = GET_DOUBLE_VAL(thisVal); + nd = GET_DOUBLE_VAL(nextVal); + break; + } + case TSDB_DATA_TYPE_BIGINT: { + td = (double)*(int64_t *)thisVal; + nd = (double)*(int64_t *)nextVal; + break; + } + } + + double val = (1 - fraction) * td + fraction * nd; + taosTFree(buffer); + + return val; + } else { // incur a second round bucket split + if (isIdenticalData(pMemBucket, i)) { + return getIdenticalDataVal(pMemBucket, i); + } + + // try next round + pMemBucket->times += 1; + uDebug("MemBucket:%p, start next round data bucketing, time:%d", pMemBucket, pMemBucket->times); + + pMemBucket->range = pSlot->range; + pMemBucket->total = 0; + + resetSlotInfo(pMemBucket); + + int32_t groupId = getGroupId(pMemBucket->numOfSlots, i, pMemBucket->times - 1); + SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); + assert(list->size > 0); + + for (int32_t f = 0; f < list->size; ++f) { + SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); + tFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + + tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); + releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); + } + + return getPercentileImpl(pMemBucket, count - num, fraction); + } + } else { + num += pSlot->info.size; } } + return 0; } double getPercentile(tMemBucket *pMemBucket, double percent) { - if (pMemBucket->numOfElems == 0) { + if (pMemBucket->total == 0) { return 0.0; } - if (pMemBucket->numOfElems == 1) { // return the only element + // if only one elements exists, return it + if (pMemBucket->total == 1) { return findOnlyResult(pMemBucket); } percent = fabs(percent); - // validate the parameters + // find the min/max value, no need to scan all data in bucket if (fabs(percent - 100.0) < DBL_EPSILON || (percent < DBL_EPSILON)) { double minx = 0, maxx = 0; - /* - * find the min/max value, no need to scan all data in bucket - */ findMaxMinValue(pMemBucket, &maxx, &minx); return fabs(percent - 100) < DBL_EPSILON ? maxx : minx; } - double percentVal = (percent * (pMemBucket->numOfElems - 1)) / ((double)100.0); + double percentVal = (percent * (pMemBucket->total - 1)) / ((double)100.0); int32_t orderIdx = (int32_t)percentVal; // do put data by using buckets @@ -934,19 +690,18 @@ double getPercentile(tMemBucket *pMemBucket, double percent) { } /* - * check if data in one slot are all identical - * only need to compare with the bounding box + * check if data in one slot are all identical only need to compare with the bounding box */ -bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { - tMemBucketSegment *pSeg = &pMemBucket->pSegs[segIdx]; +bool isIdenticalData(tMemBucket *pMemBucket, int32_t index) { + tMemBucketSlot *pSeg = &pMemBucket->pSlots[index]; - if (pMemBucket->dataType == TSDB_DATA_TYPE_INT || pMemBucket->dataType == TSDB_DATA_TYPE_BIGINT || - pMemBucket->dataType == TSDB_DATA_TYPE_SMALLINT || pMemBucket->dataType == TSDB_DATA_TYPE_TINYINT) { - return pSeg->pBoundingEntries[slotIdx].i64MinVal == pSeg->pBoundingEntries[slotIdx].i64MaxVal; + if (pMemBucket->type == TSDB_DATA_TYPE_INT || pMemBucket->type == TSDB_DATA_TYPE_BIGINT || + pMemBucket->type == TSDB_DATA_TYPE_SMALLINT || pMemBucket->type == TSDB_DATA_TYPE_TINYINT) { + return pSeg->range.i64MinVal == pSeg->range.i64MaxVal; } - if (pMemBucket->dataType == TSDB_DATA_TYPE_FLOAT || pMemBucket->dataType == TSDB_DATA_TYPE_DOUBLE) { - return fabs(pSeg->pBoundingEntries[slotIdx].dMaxVal - pSeg->pBoundingEntries[slotIdx].dMinVal) < DBL_EPSILON; + if (pMemBucket->type == TSDB_DATA_TYPE_FLOAT || pMemBucket->type == TSDB_DATA_TYPE_DOUBLE) { + return fabs(pSeg->range.dMaxVal - pSeg->range.dMinVal) < DBL_EPSILON; } return false; @@ -956,24 +711,24 @@ bool isIdenticalData(tMemBucket *pMemBucket, int32_t segIdx, int32_t slotIdx) { * get the first element of one slot into memory. * if no data of current slot in memory, load it from disk */ -char *getFirstElemOfMemBuffer(tMemBucketSegment *pSeg, int32_t slotIdx, tFilePage *pPage) { - tExtMemBuffer *pMemBuffer = pSeg->pBuffer[slotIdx]; - char * thisVal = NULL; +char *getFirstElemOfMemBuffer(tMemBucketSlot *pSeg, int32_t slotIdx, tFilePage *pPage) { +// STSBuf *pMemBuffer = pSeg->pBuffer[slotIdx]; + char *thisVal = NULL; - if (pSeg->pBuffer[slotIdx]->numOfElemsInBuffer != 0) { - thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; - } else { - /* - * no data in memory, load one page into memory - */ - tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; - assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); - int32_t ret; - ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); - UNUSED(ret); - size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); - UNUSED(sz); - thisVal = pPage->data; - } +// if (pSeg->pBuffer[slotIdx]->numOfTotal != 0) { +//// thisVal = pSeg->pBuffer[slotIdx]->pHead->item.data; +// } else { +// /* +// * no data in memory, load one page into memory +// */ +// tFlushoutInfo *pFlushInfo = &pMemBuffer->fileMeta.flushoutData.pFlushoutInfo[0]; +// assert(pFlushInfo->numOfPages == pMemBuffer->fileMeta.nFileSize); +// int32_t ret; +// ret = fseek(pMemBuffer->file, pFlushInfo->startPageId * pMemBuffer->pageSize, SEEK_SET); +// UNUSED(ret); +// size_t sz = fread(pPage, pMemBuffer->pageSize, 1, pMemBuffer->file); +// UNUSED(sz); +// thisVal = pPage->data; +// } return thisVal; }