From e9fa00185a88f2f42e039117ace9312835b53633 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 21 Mar 2023 16:15:26 +0800 Subject: [PATCH] enh(query): improve mode function page buffer allocation --- source/libs/function/src/builtinsimpl.c | 96 ++++++++++++++++--------- 1 file changed, 62 insertions(+), 34 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index fbc60041b2..c0f28b3e38 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -244,12 +244,11 @@ typedef struct SUniqueInfo { typedef struct SModeItem { int64_t count; + STuplePos dataPos; STuplePos tuplePos; - char data[]; } SModeItem; typedef struct SModeInfo { - int32_t numOfPoints; uint8_t colType; int16_t colBytes; SHashObj* pHash; @@ -257,7 +256,7 @@ typedef struct SModeInfo { STuplePos nullTuplePos; bool nullTupleSaved; - char pItems[]; + char* buf; // serialize data buffer } SModeInfo; typedef struct SDerivInfo { @@ -3113,7 +3112,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey key, +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { @@ -3149,8 +3148,8 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); } else { // other tuple save policy - if (streamStateFuncPut(pHandle->pState, &key, pBuf, length) >= 0) { - p.streamTupleKey = key; + if (streamStateFuncPut(pHandle->pState, key, pBuf, length) >= 0) { + p.streamTupleKey = *key; } } @@ -3174,7 +3173,7 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* } char* buf = serializeTupleData(pSrcBlock, rowIndex, &pCtx->subsidiaries, pCtx->subsidiaries.buf); - return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, key, pPos); + return doSaveTupleData(&pCtx->saveHandle, buf, pCtx->subsidiaries.rowLen, &key, pPos); } static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STuplePos* pPos) { @@ -4949,7 +4948,7 @@ int32_t uniqueFunction(SqlFunctionCtx* pCtx) { } bool getModeFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(SModeInfo) + MODE_MAX_RESULT_SIZE; + pEnv->calcMemSize = sizeof(SModeInfo); return true; } @@ -4959,7 +4958,6 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { } SModeInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo); - pInfo->numOfPoints = 0; pInfo->colType = pCtx->resDataInfo.type; pInfo->colBytes = pCtx->resDataInfo.bytes; if (pInfo->pHash != NULL) { @@ -4970,38 +4968,60 @@ bool modeFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) { pInfo->nullTupleSaved = false; pInfo->nullTuplePos.pageId = -1; + pInfo->buf = taosMemoryMalloc(pInfo->colBytes); + return true; } +static void modeFunctionCleanup(SModeInfo * pInfo) { + taosHashCleanup(pInfo->pHash); + taosMemoryFreeClear(pInfo->buf); +} + +static int32_t saveModeTupleData(SqlFunctionCtx* pCtx, char* data, SModeInfo *pInfo, STuplePos* pPos) { + if (IS_VAR_DATA_TYPE(pInfo->colType)) { + memcpy(pInfo->buf, data, varDataTLen(data)); + } else { + memcpy(pInfo->buf, data, pInfo->colBytes); + } + + return doSaveTupleData(&pCtx->saveHandle, pInfo->buf, pInfo->colBytes, NULL, pPos); +} + static int32_t doModeAdd(SModeInfo* pInfo, int32_t rowIndex, SqlFunctionCtx* pCtx, char* data) { - int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; - SModeItem** pHashItem = taosHashGet(pInfo->pHash, data, hashKeyBytes); + int32_t code = TSDB_CODE_SUCCESS; + int32_t hashKeyBytes = IS_STR_DATA_TYPE(pInfo->colType) ? varDataTLen(data) : pInfo->colBytes; + + SModeItem* pHashItem = (SModeItem *)taosHashGet(pInfo->pHash, data, hashKeyBytes); if (pHashItem == NULL) { - int32_t size = sizeof(SModeItem) + pInfo->colBytes; - SModeItem* pItem = (SModeItem*)(pInfo->pItems + pInfo->numOfPoints * size); - memcpy(pItem->data, data, hashKeyBytes); - pItem->count += 1; + int32_t size = sizeof(SModeItem); + SModeItem item = {0}; + + item.count += 1; + code = saveModeTupleData(pCtx, data, pInfo, &item.dataPos); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pCtx->subsidiaries.num > 0) { - int32_t code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pItem->tuplePos); + code = saveTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &item.tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } } - taosHashPut(pInfo->pHash, data, hashKeyBytes, &pItem, sizeof(SModeItem*)); - pInfo->numOfPoints++; + taosHashPut(pInfo->pHash, data, hashKeyBytes, &item, sizeof(SModeItem)); } else { - (*pHashItem)->count += 1; + pHashItem->count += 1; if (pCtx->subsidiaries.num > 0) { - int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &((*pHashItem)->tuplePos)); + int32_t code = updateTupleData(pCtx, rowIndex, pCtx->pSrcBlock, &pHashItem->tuplePos); if (code != TSDB_CODE_SUCCESS) { return code; } } } - return TSDB_CODE_SUCCESS; + return code; } int32_t modeFunction(SqlFunctionCtx* pCtx) { @@ -5024,18 +5044,15 @@ int32_t modeFunction(SqlFunctionCtx* pCtx) { char* data = colDataGetData(pInputCol, i); int32_t code = doModeAdd(pInfo, i, pCtx, data); if (code != TSDB_CODE_SUCCESS) { + modeFunctionCleanup(pInfo); return code; } - - if (sizeof(SModeInfo) + pInfo->numOfPoints * (sizeof(SModeItem) + pInfo->colBytes) >= MODE_MAX_RESULT_SIZE) { - taosHashCleanup(pInfo->pHash); - return TSDB_CODE_OUT_OF_MEMORY; - } } if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pInfo->nullTupleSaved) { int32_t code = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, &pInfo->nullTuplePos); if (code != TSDB_CODE_SUCCESS) { + modeFunctionCleanup(pInfo); return code; } pInfo->nullTupleSaved = true; @@ -5054,26 +5071,37 @@ int32_t modeFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); int32_t currentRow = pBlock->info.rows; - int32_t resIndex = -1; + STuplePos resDataPos, resTuplePos; int32_t maxCount = 0; - for (int32_t i = 0; i < pInfo->numOfPoints; ++i) { - SModeItem* pItem = (SModeItem*)(pInfo->pItems + i * (sizeof(SModeItem) + pInfo->colBytes)); + + void *pIter = taosHashIterate(pInfo->pHash, NULL); + while (pIter != NULL) { + SModeItem *pItem = (SModeItem *)pIter; if (pItem->count >= maxCount) { maxCount = pItem->count; - resIndex = i; + resDataPos = pItem->dataPos; + resTuplePos = pItem->tuplePos; } + + pIter = taosHashIterate(pInfo->pHash, pIter); } if (maxCount != 0) { - SModeItem* pResItem = (SModeItem*)(pInfo->pItems + resIndex * (sizeof(SModeItem) + pInfo->colBytes)); - colDataSetVal(pCol, currentRow, pResItem->data, false); - code = setSelectivityValue(pCtx, pBlock, &pResItem->tuplePos, currentRow); + const char* pData = loadTupleData(pCtx, &resDataPos); + if (pData == NULL) { + code = TSDB_CODE_NO_AVAIL_DISK; + modeFunctionCleanup(pInfo); + return code; + } + + colDataSetVal(pCol, currentRow, pData, false); + code = setSelectivityValue(pCtx, pBlock, &resTuplePos, currentRow); } else { colDataSetNULL(pCol, currentRow); code = setSelectivityValue(pCtx, pBlock, &pInfo->nullTuplePos, currentRow); } - taosHashCleanup(pInfo->pHash); + modeFunctionCleanup(pInfo); return code; }