diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b302641c94..ff44520e6c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -657,7 +657,6 @@ typedef struct SStreamFillOperatorInfo { SSDataBlock* pRes; SSDataBlock* pSrcBlock; int32_t srcRowIndex; - SSDataBlock* pPrevSrcBlock; SSDataBlock* pSrcDelBlock; int32_t srcDelRowIndex; SSDataBlock* pDelRes; diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 8d5af64777..f0bc79e1b2 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -470,7 +470,6 @@ static void destroyStreamFillOperatorInfo(void* param) { pInfo->pFillSup = destroyStreamFillSupporter(pInfo->pFillSup); pInfo->pRes = blockDataDestroy(pInfo->pRes); pInfo->pSrcBlock = blockDataDestroy(pInfo->pSrcBlock); - pInfo->pPrevSrcBlock = blockDataDestroy(pInfo->pPrevSrcBlock); pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes); pInfo->matchInfo.pList = taosArrayDestroy(pInfo->matchInfo.pList); taosMemoryFree(pInfo); @@ -992,12 +991,6 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { if (pInfo->srcRowIndex == 0) { keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize); - SSDataBlock* preBlock = pInfo->pPrevSrcBlock; - if (preBlock->info.rows > 0) { - int preRowId = preBlock->info.rows - 1; - SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol); - doFillResults(pOperator, pFillSup, pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId, pRes); - } pInfo->srcRowIndex++; } @@ -1011,9 +1004,8 @@ static void doStreamFillImpl(SOperatorInfo* pOperator) { } pInfo->srcRowIndex++; } + doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes); blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol); - blockDataCleanup(pInfo->pPrevSrcBlock); - copyDataBlock(pInfo->pPrevSrcBlock, pInfo->pSrcBlock); blockDataCleanup(pInfo->pSrcBlock); } @@ -1173,7 +1165,6 @@ static void doDeleteFillResult(SOperatorInfo* pOperator) { } static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) { - blockDataCleanup(pInfo->pPrevSrcBlock); tSimpleHashClear(pInfo->pFillSup->pResMap); pInfo->pFillSup->hasDelete = false; taosArrayClear(pInfo->pFillInfo->delRanges); @@ -1231,13 +1222,6 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; - SSDataBlock* preBlock = pInfo->pPrevSrcBlock; - if (preBlock->info.rows > 0) { - int preRowId = preBlock->info.rows - 1; - SColumnInfoData* pPreTsCol = taosArrayGet(preBlock->pDataBlock, pInfo->primaryTsCol); - doFillResults(pOperator, pInfo->pFillSup, pInfo->pFillInfo, preBlock, (TSKEY*)pPreTsCol->pData, preRowId, - pInfo->pRes); - } pInfo->pFillInfo->preRowKey = INT64_MIN; if (pInfo->pRes->info.rows > 0) { printDataBlock(pInfo->pRes, "stream fill"); @@ -1411,10 +1395,8 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi initResultSizeInfo(&pOperator->resultInfo, 4096); pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); - pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); - blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity); pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes); if (!pInfo->pFillInfo) { diff --git a/tests/script/tsim/stream/fillIntervalPartitionBy.sim b/tests/script/tsim/stream/fillIntervalPartitionBy.sim index 384aa2c8e4..8c06ca8bbd 100644 --- a/tests/script/tsim/stream/fillIntervalPartitionBy.sim +++ b/tests/script/tsim/stream/fillIntervalPartitionBy.sim @@ -127,7 +127,114 @@ if $rows != 13 then goto loop3 endi +sql insert into t2 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee') t1 values(1648791217000,11,11,11,11.0,'eee') (1648791219000,11,11,11,11.0,'eee'); +$loop_count = 0 + +loop4: +sleep 200 + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + + +sql select * from streamt1 order by group_id, ts; + +if $rows != 20 then + print ====streamt1=rows1=$rows + goto loop4 +endi + +if $data04 == 0 then + print ====streamt1=data04=$data04 + return -1 +endi + +sql select group_id,count(*) from streamt1 group by group_id; + +if $rows != 2 then + print ====streamt1=rows2=$rows + goto loop4 +endi + +sql select * from streamt2 order by group_id, ts; + +if $rows != 20 then + print ====streamt2=rows2=$rows + goto loop4 +endi + +if $data04 == 0 then + print ====streamt2=data04=$data04 + return -1 +endi + +sql select group_id,count(*) from streamt2 group by group_id; + +if $rows != 2 then + print ====streamt2=rows2=$rows + goto loop4 +endi + +sql select * from streamt3 order by group_id, ts; + +if $rows != 20 then + print ====streamt3=rows3=$rows + goto loop4 +endi + +if $data04 == 0 then + print ====streamt3=data04=$data04 + return -1 +endi + +sql select group_id,count(*) from streamt3 group by group_id; + +if $rows != 2 then + print ====streamt3=rows2=$rows + goto loop4 +endi + + +sql select * from streamt4 order by group_id, ts; + +if $rows != 20 then + print ====streamt4=rows4=$rows + goto loop4 +endi + +if $data04 == 0 then + print ====streamt4=data04=$data04 + return -1 +endi + +sql select group_id,count(*) from streamt4 group by group_id; + +if $rows != 2 then + print ====streamt4=rows2=$rows + goto loop4 +endi + +sql select * from streamt5 order by group_id, ts; + +if $rows != 20 then + print ====streamt5=rows5=$rows + goto loop4 +endi + +if $data04 == 0 then + print ====streamt5=data04=$data04 + return -1 +endi + +sql select group_id,count(*) from streamt5 group by group_id; + +if $rows != 2 then + print ====streamt5=rows2=$rows + goto loop4 +endi