diff --git a/include/libs/function/function.h b/include/libs/function/function.h index 8cb48cc9f0..8fa63bbd45 100644 --- a/include/libs/function/function.h +++ b/include/libs/function/function.h @@ -143,6 +143,7 @@ typedef struct SqlFunctionCtx { struct SExprInfo *pExpr; struct SDiskbasedBuf *pBuf; struct SSDataBlock *pSrcBlock; + struct SSDataBlock *pDstBlock; // used by indifinite rows function to set selectivity int32_t curBufPage; bool increase; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7bac828a53..9dc6e7898e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -651,6 +651,11 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc pfCtx->pTsOutput = (SColumnInfoData*)pCtx[*outputColIndex].pOutput; } + // link pDstBlock to set selectivity value + if (pfCtx->subsidiaries.num > 0) { + pfCtx->pDstBlock = pResult; + } + numOfRows = pfCtx->fpSet.process(pfCtx); } else if (fmIsAggFunc(pfCtx->functionId)) { // _group_key function for "partition by tbname" + csum(col_name) query diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index 324a17320e..78c65d40f5 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2425,7 +2425,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "diff", .type = FUNCTION_TYPE_DIFF, - .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, + .classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_FORBID_STREAM_FUNC, .translateFunc = translateDiff, .getEnvFunc = getDiffFuncEnv, .initFunc = diffFunctionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index b6fe5b9998..2d3f649739 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -1624,6 +1624,10 @@ int32_t minmaxFunctionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { } void setNullSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t rowIndex) { + if (pCtx->subsidiaries.num <= 0) { + return; + } + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; @@ -1655,8 +1659,6 @@ void setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STuple SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; - int32_t ps = 0; - SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId); ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); if (nullList[j]) { @@ -1678,6 +1680,39 @@ void releaseSource(STuplePos* pPos) { // Todo(liuyao) relase row } +// This function append the selectivity to subsidiaries function context directly, without fetching data +// from intermediate disk based buf page +void appendSelectivityValue(SqlFunctionCtx* pCtx, int32_t rowIndex) { + if (pCtx->subsidiaries.num <= 0) { + return; + } + + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + + // get data from source col + SFunctParam* pFuncParam = &pc->pExpr->base.pParam[0]; + int32_t srcSlotId = pFuncParam->pCol->slotId; + + SColumnInfoData* pSrcCol = taosArrayGet(pCtx->pSrcBlock->pDataBlock, srcSlotId); + + char* pData = colDataGetData(pSrcCol, rowIndex); + + // append to dest col + int32_t dstSlotId = pc->pExpr->base.resSchema.slotId; + + SColumnInfoData* pDstCol = taosArrayGet(pCtx->pDstBlock->pDataBlock, dstSlotId); + ASSERT(pc->pExpr->base.resSchema.bytes == pDstCol->info.bytes); + + if (colDataIsNull_s(pSrcCol, rowIndex) == true) { + colDataAppendNULL(pDstCol, rowIndex); + } else { + colDataAppend(pDstCol, rowIndex, pData, false); + } + } + +} + void replaceTupleData(STuplePos* pDestPos, STuplePos* pSourcePos) { releaseSource(pDestPos); *pDestPos = *pSourcePos; @@ -3154,6 +3189,7 @@ static void doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SCo colDataAppendInt64(pOutput, pos, &delta); } pDiffInfo->prev.i64 = v; + break; } case TSDB_DATA_TYPE_BOOL: @@ -3247,6 +3283,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { if (pDiffInfo->hasPrev) { doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + // handle selectivity + appendSelectivityValue(pCtx, pos); numOfElems++; } else { @@ -3273,6 +3311,8 @@ int32_t diffFunction(SqlFunctionCtx* pCtx) { // there is a row of previous data block to be handled in the first place. if (pDiffInfo->hasPrev) { doHandleDiff(pDiffInfo, pInputCol->info.type, pv, pOutput, pos, pCtx->order); + // handle selectivity + appendSelectivityValue(pCtx, pos); numOfElems++; } else {