From c56be8f8710aca92aee0b0bddca5f731f57393f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 14 Jun 2022 14:45:17 +0800 Subject: [PATCH] enh(query): add interp function. --- source/libs/executor/inc/executorimpl.h | 5 +- source/libs/executor/inc/tfill.h | 2 - source/libs/executor/src/executorimpl.c | 36 +---- source/libs/executor/src/groupoperator.c | 7 +- source/libs/executor/src/timewindowoperator.c | 153 +++++++++++++++--- 5 files changed, 134 insertions(+), 69 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 92e2fd278a..557dfbab5e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -639,9 +639,10 @@ typedef struct STimeSliceOperatorInfo { STimeWindow win; SInterval interval; int64_t current; - SGroupResInfo groupResInfo; // multiple results build supporter SArray* pPrevRow; // SArray SArray* pCols; // SArray + int32_t fillType; // fill type + struct SFillColInfo* pFillColInfo; // fill column info } STimeSliceOperatorInfo; typedef struct SStateWindowOperatorInfo { @@ -841,7 +842,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); + SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tfill.h b/source/libs/executor/inc/tfill.h index a1f45fd665..3b80b262ca 100644 --- a/source/libs/executor/inc/tfill.h +++ b/source/libs/executor/inc/tfill.h @@ -28,8 +28,6 @@ struct SSDataBlock; typedef struct SFillColInfo { SExprInfo *pExpr; -// SResSchema schema; -// int16_t functionId; // sql function id int16_t flag; // column flag: TAG COLUMN|NORMAL COLUMN int16_t tagIndex; // index of current tag in SFillTagColInfo array list SVariant fillVal; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3219ac061f..a743e79ce7 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1128,40 +1128,6 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, pCtx->increase = false; pCtx->param = pFunct->pParam; - // for (int32_t j = 0; j < pCtx->numOfParams; ++j) { - // // set the order information for top/bottom query - // int32_t functionId = pCtx->functionId; - // if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - // int32_t f = getExprFunctionId(&pExpr[0]); - // assert(f == FUNCTION_TS || f == FUNCTION_TS_DUMMY); - // - // // pCtx->param[2].i = pQueryAttr->order.order; - // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - // // pCtx->param[3].i = functionId; - // // pCtx->param[3].nType = TSDB_DATA_TYPE_BIGINT; - // - // // pCtx->param[1].i = pQueryAttr->order.col.info.colId; - // } else if (functionId == FUNCTION_INTERP) { - // // pCtx->param[2].i = (int8_t)pQueryAttr->fillType; - // // if (pQueryAttr->fillVal != NULL) { - // // if (isNull((const char *)&pQueryAttr->fillVal[i], pCtx->inputType)) { - // // pCtx->param[1].nType = TSDB_DATA_TYPE_NULL; - // // } else { // todo refactor, taosVariantCreateFromBinary should handle the NULL value - // // if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) { - // // taosVariantCreateFromBinary(&pCtx->param[1], (char *)&pQueryAttr->fillVal[i], - // pCtx->inputBytes, pCtx->inputType); - // // } - // // } - // // } - // } else if (functionId == FUNCTION_TWA) { - // // pCtx->param[1].i = pQueryAttr->window.skey; - // // pCtx->param[1].nType = TSDB_DATA_TYPE_BIGINT; - // // pCtx->param[2].i = pQueryAttr->window.ekey; - // // pCtx->param[2].nType = TSDB_DATA_TYPE_BIGINT; - // } else if (functionId == FUNCTION_ARITHM) { - // // pCtx->param[1].pz = (char*) getScalarFuncSupport(pRuntimeEnv->scalarSup, i); - // } - // } } for (int32_t i = 1; i < numOfOutput; ++i) { @@ -4317,6 +4283,8 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES); if (pInfo->pFillInfo == NULL || pInfo->p == NULL) { + taosMemoryFree(pInfo->pFillInfo); + taosMemoryFree(pInfo->p); return TSDB_CODE_OUT_OF_MEMORY; } else { return TSDB_CODE_SUCCESS; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 2aa98e099c..d29f98de09 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -332,12 +332,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - // if (!stableQuery) { // finalize include the update of result rows - // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); - // } else { - // updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfExprs, &pInfo->binfo.resultRowInfo, - // pInfo->binfo.rowCellInfoOffset); - // } + #if 0 if(pOperator->fpSet.encodeResultRow){ char *result = NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fcb8f329d6..adc3da77bd 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -17,6 +17,7 @@ #include "functionMgt.h" #include "tdatablock.h" #include "ttime.h" +#include "tfill.h" typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, @@ -1689,6 +1690,22 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (rows == 0) ? NULL : pBInfo->pRes; } +static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + // null data should not be kept since it can not be used to perform interpolation + if (!colDataIsNull_s(pColInfoData, i)) { + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); + + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, i); + memcpy(pkey->pData, val, pkey->bytes); + } + } +} + static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -1697,19 +1714,20 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { STimeSliceOperatorInfo* pSliceInfo = pOperator->info; SSDataBlock* pResBlock = pSliceInfo->binfo.pRes; - if (pOperator->status == OP_RES_TO_RETURN) { - // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } - - return pResBlock; - } +// if (pOperator->status == OP_RES_TO_RETURN) { +// // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); +// if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { +// doSetOperatorCompleted(pOperator); +// } +// +// return pResBlock; +// } int32_t order = TSDB_ORDER_ASC; SInterval* pInterval = &pSliceInfo->interval; SOperatorInfo* downstream = pOperator->pDownstream[0]; + int32_t numOfRows = 0; while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { @@ -1724,7 +1742,19 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { int64_t ts = *(int64_t*) colDataGetData(pTsCol, i); if (ts == pSliceInfo->current) { - // output the result + for(int32_t j = 0; j < pOperator->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pOperator->pExpr[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot); + + char* v = colDataGetData(pSrc, i); + colDataAppend(pDst, numOfRows, v, false); + } + + numOfRows += 1; pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); if (pSliceInfo->current > pSliceInfo->win.ekey) { @@ -1736,32 +1766,104 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { int64_t nextTs = *(int64_t*) colDataGetData(pTsCol, i + 1); if (nextTs > pSliceInfo->current) { // output the result + for (int32_t j = 0; j < pOperator->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pOperator->pExpr[j]; + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - pSliceInfo->current += taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - doSetOperatorCompleted(pOperator); - break; + SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); + SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlot); + + switch (pSliceInfo->fillType) { + case TSDB_FILL_NULL: + colDataAppendNULL(pDst, numOfRows); + break; + + case TSDB_FILL_SET_VALUE: { + SVariant* pVar = &pSliceInfo->pFillColInfo[i].fillVal; + + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, numOfRows, (char*)&v, false); + } + } + break; + + case TSDB_FILL_LINEAR: +#if 0 + if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs + || pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) { +// goto interp_exit; + } + + double v1 = -1, v2 = -1; + GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val); + GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val); + + SPoint point1 = {.key = ts, .val = &v1}; + SPoint point2 = {.key = nextTs, .val = &v2}; + SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput}; + + int32_t srcType = pCtx->inputType; + if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) { + setNull(pCtx->pOutput, srcType, pCtx->inputBytes); + } else { + bool exceedMax = false, exceedMin = false; + taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin); + if (exceedMax || exceedMin) { + __compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0); + if (func(&pCtx->start.val, &pCtx->end.val) <= 0) { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val); + } else { + COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val); + } + } + } +#endif + break; + + case TSDB_FILL_PREV: { + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); + colDataAppend(pDst, numOfRows, pkey->pData, false); + } break; + + case TSDB_FILL_NEXT: { + } break; + + case TSDB_FILL_NONE: + default: + break; + } + + pSliceInfo->current += + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + doSetOperatorCompleted(pOperator); + break; + } } } else { - // keep current row + // ignore current row, and do nothing } } else { // it is the last row of current block - // keep current row + doKeepPrevRows(pSliceInfo, pBlock); } } } } // restore the value - pOperator->status = OP_RES_TO_RETURN; - closeAllResultRows(&pSliceInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - // finalizeQueryResult(pSliceInfo->binfo.pCtx, pOperator->numOfExprs); - - // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); - // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); - - if (pResBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { + if (pResBlock->info.rows == 0) { pOperator->status = OP_EXEC_DONE; } @@ -1796,7 +1898,7 @@ static int32_t initTimesliceInfo(STimeSliceOperatorInfo* pInfo, SqlFunctionCtx* } SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo) { + SSDataBlock* pResultBlock, const SNodeListNode* pValNode, SExecTaskInfo* pTaskInfo) { STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pOperator == NULL || pInfo == NULL) { @@ -1809,8 +1911,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* } initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); + pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfCols, pValNode); - pInfo->binfo.pRes = pResultBlock; + pInfo->binfo.pRes = pResultBlock; pOperator->name = "TimeSliceOperator"; // pOperator->operatorType = OP_AllTimeWindow;