From ea18b8a7dcb2a0b229db64f804751360d459239d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 9 May 2022 17:48:23 +0800 Subject: [PATCH] feature(query): bottom function is available. --- source/libs/function/inc/builtinsimpl.h | 1 + source/libs/function/src/builtins.c | 11 +- source/libs/function/src/builtinsimpl.c | 135 ++++++++++++------------ 3 files changed, 76 insertions(+), 71 deletions(-) diff --git a/source/libs/function/inc/builtinsimpl.h b/source/libs/function/inc/builtinsimpl.h index 97832d007e..ce2f0b0651 100644 --- a/source/libs/function/inc/builtinsimpl.h +++ b/source/libs/function/inc/builtinsimpl.h @@ -70,6 +70,7 @@ int32_t lastFunction(SqlFunctionCtx *pCtx); bool getTopBotFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv); int32_t topFunction(SqlFunctionCtx *pCtx); +int32_t bottomFunction(SqlFunctionCtx *pCtx); int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock); bool getSpreadFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index a7b2607778..2be2682bc9 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -207,7 +207,8 @@ static int32_t translateTop(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { } static int32_t translateBottom(SFunctionNode* pFunc, char* pErrBuf, int32_t len) { - // todo + SDataType* pType = &((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType; + pFunc->node.resType = (SDataType){.bytes = pType->bytes, .type = pType->type}; return TSDB_CODE_SUCCESS; } @@ -569,7 +570,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "top", .type = FUNCTION_TYPE_TOP, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateTop, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, @@ -579,12 +580,12 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "bottom", .type = FUNCTION_TYPE_BOTTOM, - .classification = FUNC_MGT_AGG_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_SELECT_FUNC, .translateFunc = translateBottom, .getEnvFunc = getTopBotFuncEnv, .initFunc = functionSetup, - .processFunc = maxFunction, - .finalizeFunc = functionFinalize + .processFunc = bottomFunction, + .finalizeFunc = topBotFinalize }, { .name = "spread", diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 3d77b7b218..adacea8efd 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1104,22 +1104,32 @@ int32_t maxFunction(SqlFunctionCtx* pCtx) { return TSDB_CODE_SUCCESS; } +static void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex); + int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SResultRowEntryInfo* pEntryInfo = GET_RES_INFO(pCtx); SMinmaxResInfo* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); + int32_t type = pCtx->input.pData[0]->info.type; int32_t slotId = pCtx->pExpr->base.resSchema.slotId; SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); // todo assign the tag value int32_t currentRow = pBlock->info.rows; - colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); - int32_t pageId = pRes->tuplePos.pageId; - int32_t offset = pRes->tuplePos.offset; - if (pRes->tuplePos.pageId != -1) { + if (type) + colDataAppend(pCol, currentRow, (const char*)&pRes->v, false); + setSelectivityValue(pCtx, pBlock, &pRes->tuplePos, currentRow); + + return pEntryInfo->numOfRes; +} + +void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuplePos *pTuplePos, int32_t rowIndex) { + int32_t pageId = pTuplePos->pageId; + int32_t offset = pTuplePos->offset; + if (pTuplePos->pageId != -1) { SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); bool* nullList = (bool*)((char*)pPage + offset); @@ -1141,14 +1151,12 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); if (nullList[srcSlotId]) { - colDataAppendNULL(pDstCol, currentRow); + colDataAppendNULL(pDstCol, rowIndex); } else { - colDataAppend(pDstCol, currentRow, (pStart + ps), false); + colDataAppend(pDstCol, rowIndex, (pStart + ps), false); } } } - - return pEntryInfo->numOfRes; } bool getStddevFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) { @@ -1880,32 +1888,49 @@ static STopBotRes* getTopBotOutputInfo(SqlFunctionCtx* pCtx) { } static void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo); + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery); int32_t topFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = 0; SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - // if ((void *)pRes->res[0] != (void *)((char *)pRes + sizeof(STopBotRes) + POINTER_BYTES * pCtx->param[0].i)) { - // buildTopBotStruct(pRes, pCtx); - // } - SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pCol = pInput->pData[0]; int32_t type = pInput->pData[0]->info.type; int32_t start = pInput->startRowIndex; - int32_t numOfRows = pInput->numOfRows; - - for (int32_t i = start; i < numOfRows + start; ++i) { + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { continue; } - numOfElems++; + numOfElems++; char* data = colDataGetData(pCol, i); - doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, true); + } + + return TSDB_CODE_SUCCESS; +} + +int32_t bottomFunction(SqlFunctionCtx* pCtx) { + int32_t numOfElems = 0; + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + + SInputColumnInfoData* pInput = &pCtx->input; + SColumnInfoData* pCol = pInput->pData[0]; + + int32_t type = pInput->pData[0]->info.type; + + int32_t start = pInput->startRowIndex; + for (int32_t i = start; i < pInput->numOfRows + start; ++i) { + if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { + continue; + } + + numOfElems++; + char* data = colDataGetData(pCol, i); + doAddIntoResult(pCtx, data, i, pCtx->pSrcBlock, type, pInput->uid, pResInfo, false); } return TSDB_CODE_SUCCESS; @@ -1939,7 +1964,7 @@ static int32_t topBotResComparFn(const void* p1, const void* p2, const void* par } void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSDataBlock* pSrcBlock, uint16_t type, - uint64_t uid, SResultRowEntryInfo* pEntryInfo) { + uint64_t uid, SResultRowEntryInfo* pEntryInfo, bool isTopQuery) { STopBotRes* pRes = getTopBotOutputInfo(pCtx); int32_t maxSize = pCtx->param[1].param.i; @@ -1961,10 +1986,17 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // allocate the buffer and keep the data of this row into the new allocated buffer pEntryInfo->numOfRes++; taosheapsort((void*)pItems, sizeof(STopBotResItem), pEntryInfo->numOfRes, (const void*)&type, topBotResComparFn, - false); + !isTopQuery); } else { // replace the minimum value in the result - if ((IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || - (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d)) { + if ((isTopQuery && ( + (IS_SIGNED_NUMERIC_TYPE(type) && val.i > pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u > pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d > pItems[0].v.d))) + || (!isTopQuery && ( + (IS_SIGNED_NUMERIC_TYPE(type) && val.i < pItems[0].v.i) || + (IS_UNSIGNED_NUMERIC_TYPE(type) && val.u < pItems[0].v.u) || + (IS_FLOAT_TYPE(type) && val.d < pItems[0].v.d)) + )) { // replace the old data and the coresponding tuple data STopBotResItem* pItem = &pItems[0]; pItem->v = val; @@ -1973,7 +2005,7 @@ void doAddIntoResult(SqlFunctionCtx* pCtx, void* pData, int32_t rowIndex, SSData // save the data of this tuple by over writing the old data copyTupleData(pCtx, rowIndex, pSrcBlock, &pItem->tuplePos); taosheapadjust((void*)pItems, sizeof(STopBotResItem), 0, pEntryInfo->numOfRes - 1, (const void*)&type, - topBotResComparFn, NULL, false); + topBotResComparFn, NULL, !isTopQuery); } } } @@ -2005,6 +2037,7 @@ void saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pS bool isNull = colDataIsNull_s(pCol, rowIndex); if (isNull) { nullList[i] = true; + offset += pCol->info.bytes; continue; } @@ -2057,54 +2090,24 @@ int32_t topBotFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { STopBotRes* pRes = GET_ROWCELL_INTERBUF(pEntryInfo); pEntryInfo->complete = true; - int32_t type = pCtx->input.pData[0]->info.type; - int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + int32_t type = pCtx->input.pData[0]->info.type; + int32_t slotId = pCtx->pExpr->base.resSchema.slotId; + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); // todo assign the tag value and the corresponding row data int32_t currentRow = pBlock->info.rows; - switch (type) { - case TSDB_DATA_TYPE_INT: { - for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { - STopBotResItem* pItem = &pRes->pItems[i]; - colDataAppendInt32(pCol, currentRow, (int32_t*)&pItem->v.i); - - int32_t pageId = pItem->tuplePos.pageId; - int32_t offset = pItem->tuplePos.offset; - if (pItem->tuplePos.pageId != -1) { - SFilePage* pPage = getBufPage(pCtx->pBuf, pageId); - - bool* nullList = (bool*)((char*)pPage + offset); - char* pStart = (char*)(nullList + pCtx->pSrcBlock->info.numOfCols * sizeof(bool)); - - // todo set the offset value to optimize the performance. - for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { - SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; - - SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; - int32_t srcSlotId = pFuncParam->pCol->slotId; - int32_t dstSlotId = pCtx->pExpr->base.resSchema.slotId; - - int32_t ps = 0; - for (int32_t k = 0; k < srcSlotId; ++k) { - SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, k); - ps += pSrcCol->info.bytes; - } - - SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); - if (nullList[srcSlotId]) { - colDataAppendNULL(pDstCol, currentRow); - } else { - colDataAppend(pDstCol, currentRow, (pStart + ps), false); - } - } - } - - currentRow += 1; - } - - break; + for (int32_t i = 0; i < pEntryInfo->numOfRes; ++i) { + STopBotResItem* pItem = &pRes->pItems[i]; + if (type == TSDB_DATA_TYPE_FLOAT) { + float v = pItem->v.d; + colDataAppend(pCol, currentRow, (const char*)&v, false); + } else { + colDataAppend(pCol, currentRow, (const char*)&pItem->v.i, false); } + + setSelectivityValue(pCtx, pBlock, &pRes->pItems[i].tuplePos, currentRow); + currentRow += 1; } return pEntryInfo->numOfRes;