This commit is contained in:
factosea 2024-07-04 11:04:59 +08:00
parent 431545d540
commit 5f0ac46029
9 changed files with 249 additions and 89 deletions

View File

@ -41,6 +41,7 @@ typedef int32_t (*FExecProcess)(struct SqlFunctionCtx *pCtx);
typedef int32_t (*FExecFinalize)(struct SqlFunctionCtx *pCtx, SSDataBlock *pBlock);
typedef int32_t (*FScalarExecProcess)(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
typedef int32_t (*FExecCombine)(struct SqlFunctionCtx *pDestCtx, struct SqlFunctionCtx *pSourceCtx);
typedef int32_t (*processFuncByRow)(SArray* pCtx); // array of SqlFunctionCtx
typedef struct SScalarFuncExecFuncs {
FExecGetEnv getEnv;
@ -48,11 +49,12 @@ typedef struct SScalarFuncExecFuncs {
} SScalarFuncExecFuncs;
typedef struct SFuncExecFuncs {
FExecGetEnv getEnv;
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
FExecCombine combine;
FExecGetEnv getEnv;
FExecInit init;
FExecProcess process;
FExecFinalize finalize;
FExecCombine combine;
processFuncByRow processFuncByRow;
} SFuncExecFuncs;
#define MAX_INTERVAL_TIME_WINDOW 10000000 // maximum allowed time windows in final results

View File

@ -255,6 +255,7 @@ bool fmIsIgnoreNullFunc(int32_t funcId);
bool fmIsConstantResFunc(SFunctionNode* pFunc);
bool fmIsSkipScanCheckFunc(int32_t funcId);
bool fmIsPrimaryKeyFunc(int32_t funcId);
bool fmIsProcessByRowFunc(int32_t funcId);
void getLastCacheDataType(SDataType* pType, int32_t pkBytes);
SFunctionNode* createFunction(const char* pName, SNodeList* pParameterList);

View File

@ -724,9 +724,12 @@ static void setPseudoOutputColInfo(SSDataBlock* pResult, SqlFunctionCtx* pCtx, S
int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx,
int32_t numOfOutput, SArray* pPseudoList) {
int32_t code = TSDB_CODE_SUCCESS;
setPseudoOutputColInfo(pResult, pCtx, pPseudoList);
pResult->info.dataLoad = 1;
SArray* diffFunctionCtx = NULL;
if (pSrcBlock == NULL) {
for (int32_t k = 0; k < numOfOutput; ++k) {
int32_t outputSlotId = pExpr[k].base.resSchema.slotId;
@ -743,7 +746,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
pResult->info.rows = 1;
return TSDB_CODE_SUCCESS;
goto _exit;
}
if (pResult != pSrcBlock) {
@ -816,10 +819,10 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
SColumnInfoData idata = {.info = pResColData->info, .hasNull = true};
SScalarParam dest = {.columnData = &idata};
int32_t code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
code = scalarCalculate(pExpr[k].pExpr->_optrRoot.pRootNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
return code;
goto _exit;
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
@ -852,11 +855,21 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
pfCtx->pDstBlock = pResult;
}
int32_t code = pfCtx->fpSet.process(pfCtx);
code = pfCtx->fpSet.process(pfCtx);
if (code != TSDB_CODE_SUCCESS) {
return code;
goto _exit;
}
numOfRows = pResInfo->numOfRes;
if (fmIsProcessByRowFunc(pfCtx->functionId)) {
if (NULL == diffFunctionCtx) {
diffFunctionCtx = taosArrayInit(1, sizeof(SqlFunctionCtx*));
if (!diffFunctionCtx) {
code = terrno;
goto _exit;
}
}
taosArrayPush(diffFunctionCtx, &pfCtx);
}
} else if (fmIsAggFunc(pfCtx->functionId)) {
// selective value output should be set during corresponding function execution
if (fmIsSelectValueFunc(pfCtx->functionId)) {
@ -889,7 +902,7 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
int32_t code = scalarCalculate((SNode*)pExpr[k].pExpr->_function.pFunctNode, pBlockList, &dest);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pBlockList);
return code;
goto _exit;
}
int32_t startOffset = createNewColModel ? 0 : pResult->info.rows;
@ -905,9 +918,18 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
}
}
if (diffFunctionCtx && taosArrayGetSize(diffFunctionCtx) > 0){
SqlFunctionCtx** pfCtx = taosArrayGet(diffFunctionCtx, 0);
(*pfCtx)->fpSet.processFuncByRow(diffFunctionCtx);
numOfRows = (*pfCtx)->resultInfo->numOfRes;
}
if (!createNewColModel) {
pResult->info.rows += numOfRows;
}
return TSDB_CODE_SUCCESS;
_exit:
if(diffFunctionCtx) {
taosArrayDestroy(diffFunctionCtx);
diffFunctionCtx = NULL;
}
return code;
}

View File

@ -50,6 +50,7 @@ typedef struct SBuiltinFuncDefinition {
const char* pStateFunc;
FCreateMergeFuncParameters createMergeParaFuc;
FEstimateReturnRows estimateReturnRowsFunc;
processFuncByRow processFuncByRow;
} SBuiltinFuncDefinition;
extern const SBuiltinFuncDefinition funcMgtBuiltins[];

View File

@ -133,6 +133,7 @@ int32_t getApercentileMaxSize();
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);
int32_t diffFunction(SqlFunctionCtx* pCtx);
int32_t diffFunctionByRow(SArray* pCtx);
bool getDerivativeFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
bool derivativeFuncSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo);

View File

@ -57,6 +57,7 @@ extern "C" {
#define FUNC_MGT_PRIMARY_KEY_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(28)
#define FUNC_MGT_TSMA_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(29)
#define FUNC_MGT_COUNT_LIKE_FUNC FUNC_MGT_FUNC_CLASSIFICATION_MASK(30) // funcs that should also return 0 when no rows found
#define FUNC_MGT_PROCESS_BY_ROW FUNC_MGT_FUNC_CLASSIFICATION_MASK(31)
#define FUNC_MGT_TEST_MASK(val, mask) (((val) & (mask)) != 0)

View File

@ -1965,9 +1965,9 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
SValueNode* pValue = (SValueNode*)pParamNode1;
if (pValue->datum.i != 0 && pValue->datum.i != 1) {
if (pValue->datum.i < 0 || pValue->datum.i > 3) {
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
"Second parameter of DIFF function should be only 0 or 1");
"Second parameter of DIFF function should be a number between 0 and 3.");
}
pValue->notReserved = true;
@ -1986,11 +1986,7 @@ static int32_t translateDiff(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
}
static EFuncReturnRows diffEstReturnRows(SFunctionNode* pFunc) {
if (1 == LIST_LENGTH(pFunc->pParameterList)) {
return FUNC_RETURN_ROWS_N_MINUS_1;
}
return 1 == ((SValueNode*)nodesListGetNode(pFunc->pParameterList, 1))->datum.i ? FUNC_RETURN_ROWS_INDEFINITE
: FUNC_RETURN_ROWS_N_MINUS_1;
return FUNC_RETURN_ROWS_N_MINUS_1;
}
static int32_t translateLength(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
@ -3206,7 +3202,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
{
.name = "diff",
.type = FUNCTION_TYPE_DIFF,
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC |
.classification = FUNC_MGT_INDEFINITE_ROWS_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC | FUNC_MGT_PROCESS_BY_ROW |
FUNC_MGT_KEEP_ORDER_FUNC | FUNC_MGT_FORBID_STREAM_FUNC | FUNC_MGT_CUMULATIVE_FUNC | FUNC_MGT_FORBID_SYSTABLE_FUNC | FUNC_MGT_PRIMARY_KEY_FUNC,
.translateFunc = translateDiff,
.getEnvFunc = getDiffFuncEnv,
@ -3215,6 +3211,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
.sprocessFunc = diffScalarFunction,
.finalizeFunc = functionFinalize,
.estimateReturnRowsFunc = diffEstReturnRows,
.processFuncByRow = diffFunctionByRow,
},
{
.name = "statecount",

View File

@ -110,10 +110,9 @@ typedef enum {
} EAPerctAlgoType;
typedef struct SDiffInfo {
bool hasPrev;
bool includeNull;
bool ignoreNegative; // replace the ignore with case when
bool firstOutput;
bool hasPrev;
bool isFirstRow;
int8_t ignoreOption; // replace the ignore with case when
union {
int64_t i64;
double d64;
@ -122,6 +121,12 @@ typedef struct SDiffInfo {
int64_t prevTs;
} SDiffInfo;
bool ignoreNegative(int8_t ignoreOption){
return (ignoreOption & 0x1) == 0x1;
}
bool ignoreNull(int8_t ignoreOption){
return (ignoreOption & 0x2) == 0x2;
}
typedef struct SSpreadInfo {
double result;
bool hasResult;
@ -3100,15 +3105,14 @@ bool diffFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResInfo) {
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->hasPrev = false;
pDiffInfo->isFirstRow = true;
pDiffInfo->prev.i64 = 0;
pDiffInfo->prevTs = -1;
if (pCtx->numOfParams > 1) {
pDiffInfo->ignoreNegative = pCtx->param[1].param.i; // TODO set correct param
pDiffInfo->ignoreOption = pCtx->param[1].param.i; // TODO set correct param
} else {
pDiffInfo->ignoreNegative = false;
pDiffInfo->ignoreOption = 0;
}
pDiffInfo->includeNull = true;
pDiffInfo->firstOutput = false;
return true;
}
@ -3148,6 +3152,45 @@ static int32_t doSetPrevVal(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
return TSDB_CODE_SUCCESS;
}
static int32_t diffIsNegtive(SDiffInfo* pDiffInfo, int32_t type, const char* pv) {
switch (type) {
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)pv;
return v < pDiffInfo->prev.i64;
}
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_UTINYINT:
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)pv;
return v < pDiffInfo->prev.i64;
}
case TSDB_DATA_TYPE_USMALLINT:
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)pv;
return v < pDiffInfo->prev.i64;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)pv;
return v < pDiffInfo->prev.i64;
}
case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)pv;
return v < pDiffInfo->prev.d64;
}
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)pv;
return v < pDiffInfo->prev.d64;
}
default:
return false;
}
return false;
}
static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv, SColumnInfoData* pOutput, int32_t pos,
int64_t ts) {
pDiffInfo->prevTs = ts;
@ -3156,8 +3199,9 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_INT: {
int32_t v = *(int32_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) {
colDataSetNull_f_s(pOutput, pos);
pOutput->hasNull = true;
} else {
colDataSetInt64(pOutput, pos, &delta);
}
@ -3170,7 +3214,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_TINYINT: {
int8_t v = *(int8_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) {
colDataSetNull_f_s(pOutput, pos);
} else {
colDataSetInt64(pOutput, pos, &delta);
@ -3182,7 +3226,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_SMALLINT: {
int16_t v = *(int16_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) {
colDataSetNull_f_s(pOutput, pos);
} else {
colDataSetInt64(pOutput, pos, &delta);
@ -3195,7 +3239,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_BIGINT: {
int64_t v = *(int64_t*)pv;
int64_t delta = v - pDiffInfo->prev.i64; // direct previous may be null
if (delta < 0 && pDiffInfo->ignoreNegative) {
if (delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) {
colDataSetNull_f_s(pOutput, pos);
} else {
colDataSetInt64(pOutput, pos, &delta);
@ -3206,7 +3250,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_FLOAT: {
float v = *(float*)pv;
double delta = v - pDiffInfo->prev.d64; // direct previous may be null
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow
colDataSetNull_f_s(pOutput, pos);
} else {
colDataSetDouble(pOutput, pos, &delta);
@ -3217,7 +3261,7 @@ static int32_t doHandleDiff(SDiffInfo* pDiffInfo, int32_t type, const char* pv,
case TSDB_DATA_TYPE_DOUBLE: {
double v = *(double*)pv;
double delta = v - pDiffInfo->prev.d64; // direct previous may be null
if ((delta < 0 && pDiffInfo->ignoreNegative) || isinf(delta) || isnan(delta)) { // check for overflow
if ((delta < 0 && ignoreNegative(pDiffInfo->ignoreOption)) || isinf(delta) || isnan(delta)) { // check for overflow
colDataSetNull_f_s(pOutput, pos);
} else {
colDataSetDouble(pOutput, pos, &delta);
@ -3271,71 +3315,159 @@ bool funcInputGetNextRowIndex(SInputColumnInfoData* pInput, int32_t from, bool f
}
}
int32_t diffFunction(SqlFunctionCtx* pCtx) {
int32_t diffResultIsNull(SqlFunctionCtx* pCtx, SFuncInputRow* pRow){
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (pRow->isDataNull || !pDiffInfo->hasPrev ) {
return true;
} else if (ignoreNegative(pDiffInfo->ignoreOption)){
return diffIsNegtive(pDiffInfo, pCtx->input.pData[0]->info.type, pRow->pData);
}
return false;
}
bool isFirstRow(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
return pDiffInfo->isFirstRow;
}
int32_t setPreVal(SqlFunctionCtx* pCtx, SFuncInputRow* pRow) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDiffInfo->isFirstRow = false;
if (pRow->isDataNull) {
return TSDB_CODE_SUCCESS;
}
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int8_t inputType = pInputCol->info.type;
char* pv = pRow->pData;
int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, pRow->ts);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pDiffInfo->hasPrev = true;
return TSDB_CODE_SUCCESS;
}
int32_t doDiff(SqlFunctionCtx* pCtx, SFuncInputRow* pRow, int32_t pos) {
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
SInputColumnInfoData* pInput = &pCtx->input;
SColumnInfoData* pInputCol = pInput->pData[0];
int8_t inputType = pInputCol->info.type;
SColumnInfoData* pInputCol = pInput->pData[0];
int8_t inputType = pInputCol->info.type;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
TSKEY* tsList = (int64_t*)pInput->pPTS->pData;
if (pRow->isDataNull) {
colDataSetNull_f_s(pOutput, pos);
pOutput->hasNull = true;
int32_t numOfElems = 0;
int32_t startOffset = pCtx->offset;
SColumnInfoData* pOutput = (SColumnInfoData*)pCtx->pOutput;
funcInputUpdate(pCtx);
SFuncInputRow row = {0};
while (funcInputGetNextRow(pCtx, &row)) {
int32_t pos = startOffset + numOfElems;
if (row.isDataNull) {
if (pDiffInfo->includeNull) {
colDataSetNull_f_s(pOutput, pos);
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
}
numOfElems += 1;
}
continue;
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos);
}
char* pv = row.pData;
if (pDiffInfo->hasPrev) {
if (row.ts == pDiffInfo->prevTs) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
}
int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, row.ts);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityCols(pCtx, row.block, row.rowIndex, pos);
}
numOfElems++;
} else {
int32_t code = doSetPrevVal(pDiffInfo, inputType, pv, row.ts);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
pDiffInfo->hasPrev = true;
return TSDB_CODE_SUCCESS;
}
pResInfo->numOfRes = numOfElems;
char* pv = pRow->pData;
if (pRow->ts == pDiffInfo->prevTs) {
return TSDB_CODE_FUNC_DUP_TIMESTAMP;
}
int32_t code = doHandleDiff(pDiffInfo, inputType, pv, pOutput, pos, pRow->ts);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
// handle selectivity
if (pCtx->subsidiaries.num > 0) {
appendSelectivityCols(pCtx, pRow->block, pRow->rowIndex, pos);
}
pDiffInfo->hasPrev = true;
return TSDB_CODE_SUCCESS;
}
int32_t diffFunction(SqlFunctionCtx* pCtx) {
return TSDB_CODE_SUCCESS;
}
int32_t diffFunctionByRow(SArray* pCtxArray) {
int32_t code = TSDB_CODE_SUCCESS;
int diffColNum = pCtxArray->size;
if(diffColNum == 0) {
return TSDB_CODE_SUCCESS;
}
int32_t numOfElems = 0;
SArray* pRows = taosArrayInit_s(sizeof(SFuncInputRow), diffColNum);
bool keepNull = false;
for (int i = 0; i < diffColNum; ++i) {
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
funcInputUpdate(pCtx);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
SDiffInfo* pDiffInfo = GET_ROWCELL_INTERBUF(pResInfo);
if (!ignoreNull(pDiffInfo->ignoreOption)) {
keepNull = true;
}
}
SqlFunctionCtx* pCtx0 = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, 0);
SFuncInputRow* pRow0 = (SFuncInputRow*)taosArrayGet(pRows, 0);
int32_t startOffset = pCtx0->offset;
while (funcInputGetNextRow(pCtx0, pRow0)) {
bool hasNotNullValue = !diffResultIsNull(pCtx0, pRow0);
for (int i = 1; i < diffColNum; ++i) {
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i);
if(!funcInputGetNextRow(pCtx, pRow)) {
// rows are not equal
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
goto _exit;
}
hasNotNullValue = !diffResultIsNull(pCtx, pRow);
}
int32_t pos = startOffset + numOfElems;
bool newRow = false;
for (int i = 0; i < diffColNum; ++i) {
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
SFuncInputRow* pRow = (SFuncInputRow*)taosArrayGet(pRows, i);
if ((keepNull || hasNotNullValue) && !isFirstRow(pCtx, pRow)){
code = doDiff(pCtx, pRow, pos);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
newRow = true;
} else {
code = setPreVal(pCtx, pRow);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
}
}
if (newRow) ++numOfElems;
}
for (int i = 0; i < diffColNum; ++i) {
SqlFunctionCtx* pCtx = *(SqlFunctionCtx**)taosArrayGet(pCtxArray, i);
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
pResInfo->numOfRes = numOfElems;
}
_exit:
if (pRows) {
taosArrayDestroy(pRows);
pRows = NULL;
}
return code;
}
int32_t getTopBotInfoSize(int64_t numOfItems) { return sizeof(STopBotRes) + numOfItems * sizeof(STopBotResItem); }
bool getTopBotFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {

View File

@ -141,6 +141,7 @@ int32_t fmGetFuncExecFuncs(int32_t funcId, SFuncExecFuncs* pFpSet) {
pFpSet->process = funcMgtBuiltins[funcId].processFunc;
pFpSet->finalize = funcMgtBuiltins[funcId].finalizeFunc;
pFpSet->combine = funcMgtBuiltins[funcId].combineFunc;
pFpSet->processFuncByRow = funcMgtBuiltins[funcId].processFuncByRow;
return TSDB_CODE_SUCCESS;
}
@ -274,6 +275,8 @@ bool fmIsBlockDistFunc(int32_t funcId) {
return FUNCTION_TYPE_BLOCK_DIST == funcMgtBuiltins[funcId].type;
}
bool fmIsProcessByRowFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_PROCESS_BY_ROW); }
bool fmIsIgnoreNullFunc(int32_t funcId) { return isSpecificClassifyFunc(funcId, FUNC_MGT_IGNORE_NULL_FUNC); }
void fmFuncMgtDestroy() {