From b902ab0d6184f193bd384c6037b783f876281cf3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Tue, 8 Nov 2022 18:48:22 +0800 Subject: [PATCH] refactor code --- source/libs/executor/src/timewindowoperator.c | 97 +++++++------------ 1 file changed, 36 insertions(+), 61 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 75d7010eb1..461a4fbe43 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2121,6 +2121,40 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp return hasInterp; } +static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, + SSDataBlock* pSrcBlock, int32_t index) { + blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); + + if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { + colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); + } else { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, index)) { + if (pSliceInfo->fillType != TSDB_FILL_LINEAR) { + colDataAppendNULL(pDst, pResBlock->info.rows); + continue; + } else { + return; + } + } + + char* v = colDataGetData(pSrc, index); + colDataAppend(pDst, pResBlock->info.rows, v, false); + } + } + + pResBlock->info.rows += 1; + return; +} + + static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { if (pInfo->pPrevRow != NULL) { return TSDB_CODE_SUCCESS; @@ -2263,37 +2297,8 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } if (ts == pSliceInfo->current) { - blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); - bool isNullRow = false; - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); - } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); - - if (colDataIsNull_s(pSrc, i)) { - if (pSliceInfo->fillType != TSDB_FILL_LINEAR) { - colDataAppendNULL(pDst, pResBlock->info.rows); - continue; - } else { - isNullRow = true; - } - } - - char* v = colDataGetData(pSrc, i); - colDataAppend(pDst, pResBlock->info.rows, v, false); - } - } - - if (!isNullRow) { - pResBlock->info.rows += 1; - } doKeepPrevRows(pSliceInfo, pBlock, i); doKeepLinearInfo(pSliceInfo, pBlock, i); @@ -2348,37 +2353,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { // add current row if timestamp match if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { - blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); - bool isNullRow = false; - for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { - SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[j]; - - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); - } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, srcSlot); - - if (colDataIsNull_s(pSrc, i)) { - if (pSliceInfo->fillType != TSDB_FILL_LINEAR) { - colDataAppendNULL(pDst, pResBlock->info.rows); - continue; - } else { - isNullRow = true; - } - } - - char* v = colDataGetData(pSrc, i); - colDataAppend(pDst, pResBlock->info.rows, v, false); - } - } - - if (!isNullRow) { - pResBlock->info.rows += 1; - } + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); doKeepPrevRows(pSliceInfo, pBlock, i); pSliceInfo->current =