feat(query): interp support ignore null value opition

This commit is contained in:
Ganlin Zhao 2023-05-16 16:33:28 +08:00
parent 6879a784ae
commit 375aa2f26b
1 changed files with 47 additions and 3 deletions

View File

@ -209,6 +209,45 @@ static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
return (functionType == FUNCTION_TYPE_GROUP_KEY);
}
static bool getIgoreNullRes(SExprSupp* pExprSup) {
for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
if (isInterpFunc(pExprInfo)) {
for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
SFunctParam *pFuncParam = &pExprInfo->base.pParam[j];
if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
return pFuncParam->param.i ? true : false;
}
}
}
}
return false;
}
static bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
if (!ignoreNull) {
return false;
}
for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
if (isInterpFunc(pExprInfo)) {
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
if (colDataIsNull_s(pSrc, index)) {
return true;
}
}
}
return false;
}
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
SSDataBlock* pSrcBlock, int32_t index, bool beforeTs) {
int32_t rows = pResBlock->info.rows;
@ -590,7 +629,7 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
}
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
SExecTaskInfo* pTaskInfo) {
SExecTaskInfo* pTaskInfo, bool ignoreNull) {
SSDataBlock* pResBlock = pSliceInfo->pRes;
SInterval* pInterval = &pSliceInfo->interval;
@ -603,6 +642,10 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_FUNC_DUP_TIMESTAMP);
}
if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
continue;
}
if (pSliceInfo->current > pSliceInfo->win.ekey) {
break;
}
@ -732,6 +775,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
SSDataBlock* pResBlock = pSliceInfo->pRes;
SExprSupp* pSup = &pOperator->exprSupp;
bool ignoreNull = getIgoreNullRes(pSup);
int32_t order = TSDB_ORDER_ASC;
SInterval* pInterval = &pSliceInfo->interval;
@ -742,7 +786,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
while (1) {
if (pSliceInfo->pNextGroupRes != NULL) {
setInputDataBlock(pSup, pSliceInfo->pNextGroupRes, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo);
doTimesliceImpl(pOperator, pSliceInfo, pSliceInfo->pNextGroupRes, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pSliceInfo->pNextGroupRes);
pSliceInfo->pNextGroupRes = NULL;
}
@ -776,7 +820,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo);
doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKey, pBlock);
}