diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1b11941849..201bc5896d 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -560,6 +560,7 @@ typedef struct SFillOperatorInfo { SNode* pCondition; SArray* pColMatchColInfo; int32_t primaryTsCol; + uint64_t curGroupId; // current handled group id } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 851e3bc5bf..266567ec1b 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1637,8 +1637,6 @@ static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, int32_t doFillTimeIntervalGapsInResults(struct SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t capacity) { int32_t numOfRows = (int32_t)taosFillResultDataBlock(pFillInfo, pBlock, capacity - pBlock->info.rows); - pBlock->info.rows += numOfRows; - return pBlock->info.rows; } @@ -3344,14 +3342,15 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->existNewGroupBlock); doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, pResultInfo->capacity); + pInfo->curGroupId = pInfo->existNewGroupBlock->info.groupId; pInfo->existNewGroupBlock = NULL; - *newgroup = true; +// *newgroup = true; } static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInfo* pResultInfo, bool* newgroup, SExecTaskInfo* pTaskInfo) { if (taosFillHasMoreResults(pInfo->pFillInfo)) { - *newgroup = false; +// *newgroup = false; doFillTimeIntervalGapsInResults(pInfo->pFillInfo, pInfo->pRes, (int32_t)pResultInfo->capacity); if (pInfo->pRes->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult)) { return; @@ -3373,10 +3372,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { blockDataCleanup(pResBlock); - // todo handle different group data interpolation - bool n = false; - bool* newgroup = &n; - doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); + doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) { return pResBlock; } @@ -3384,31 +3380,29 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); - if (*newgroup) { - assert(pBlock != NULL); - } + if (pBlock == NULL) { + if (pInfo->totalInputRows == 0) { + pOperator->status = OP_EXEC_DONE; + return NULL; + } - blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); - - if (*newgroup && pInfo->totalInputRows > 0) { // there are already processed current group data block - pInfo->existNewGroupBlock = pBlock; - *newgroup = false; - - // Fill the previous group data block, before handle the data block of new group. - // Close the fill operation for previous group data block taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); } else { - if (pBlock == NULL) { - if (pInfo->totalInputRows == 0) { - pOperator->status = OP_EXEC_DONE; - return NULL; - } + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsCol); + + if (pInfo->curGroupId == 0 || pInfo->curGroupId == pBlock->info.groupId) { + pInfo->curGroupId = pBlock->info.groupId; // the first data block - taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); - } else { pInfo->totalInputRows += pBlock->info.rows; + taosFillSetStartInfo(pInfo->pFillInfo, pBlock->info.rows, pBlock->info.window.ekey); taosFillSetInputDataBlock(pInfo->pFillInfo, pBlock); + } else if (pInfo->curGroupId != pBlock->info.groupId) { // the new group data block + pInfo->existNewGroupBlock = pBlock; + + // Fill the previous group data block, before handle the data block of new group. + // Close the fill operation for previous group data block + taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey); } } @@ -3419,17 +3413,17 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { if (pResBlock->info.rows > 0) { // 1. The result in current group not reach the threshold of output result, continue // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately - if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || (!pInfo->multigroupResult)) { + if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) { return pResBlock; } - doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); - if (pResBlock->info.rows > pOperator->resultInfo.threshold || pBlock == NULL) { + doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, NULL, pTaskInfo); + if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) { return pResBlock; } } else if (pInfo->existNewGroupBlock) { // try next group assert(pBlock != NULL); - doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, newgroup, pTaskInfo); + doHandleRemainBlockForNewGroupImpl(pInfo, pResultInfo, NULL, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold) { return pResBlock; } diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index e0bdcfdc3a..550938140e 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -72,7 +72,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order); // set the primary timestamp column value - int32_t index = pFillInfo->numOfCurrent; + int32_t index = pBlock->info.rows; // set the other values if (pFillInfo->type == TSDB_FILL_PREV) { @@ -191,6 +191,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* SInterval* pInterval = &pFillInfo->interval; pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); + pBlock->info.rows += 1; pFillInfo->numOfCurrent++; } @@ -273,6 +274,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t } } else { assert(pFillInfo->currentKey == ts); + int32_t index = pBlock->info.rows; if (pFillInfo->type == TSDB_FILL_NEXT && (pFillInfo->index + 1) < pFillInfo->numOfRows) { int32_t nextRowIndex = pFillInfo->index + 1; @@ -296,24 +298,24 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t if (i == 0 || (/*pCol->functionId != FUNCTION_COUNT &&*/ !colDataIsNull_s(pSrc, pFillInfo->index)) /*|| (pCol->functionId == FUNCTION_COUNT && GET_INT64_VAL(src) != 0)*/) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); - colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull); + colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, isNull); } else { // i > 0 and data is null , do interpolation if (pFillInfo->type == TSDB_FILL_PREV) { SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i); - doSetVal(pDst, pFillInfo->numOfCurrent, pKey); + doSetVal(pDst, index, pKey); } else if (pFillInfo->type == TSDB_FILL_LINEAR) { bool isNull = colDataIsNull_s(pSrc, pFillInfo->index); - colDataAppend(pDst, pFillInfo->numOfCurrent, src, isNull); + colDataAppend(pDst, index, src, isNull); saveColData(pFillInfo->prev, i, src, isNull); } else if (pFillInfo->type == TSDB_FILL_NULL) { - colDataAppendNULL(pDst, pFillInfo->numOfCurrent); + colDataAppendNULL(pDst, index); } else if (pFillInfo->type == TSDB_FILL_NEXT) { SGroupKeys* pKey = taosArrayGet(pFillInfo->next, i); - doSetVal(pDst, pFillInfo->numOfCurrent, pKey); + doSetVal(pDst, index, pKey); } else { SVariant* pVar = &pFillInfo->pFillCol[i].fillVal; - colDataAppend(pDst, pFillInfo->numOfCurrent, (char*)&pVar->i, false); + colDataAppend(pDst, index, (char*)&pVar->i, false); } } } @@ -324,6 +326,7 @@ static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision); + pBlock->info.rows += 1; pFillInfo->index += 1; pFillInfo->numOfCurrent += 1; } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 6eb72b1717..0245379672 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6026,8 +6026,6 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { } pInfo->ts = cts; - pResInfo->numOfRes = 1; - if (pCtx->subsidiaries.num > 0) { STuplePos* pTuplePos = (STuplePos*)(pInfo->buf + bytes + sizeof(TSKEY)); if (!pInfo->hasResult) {