diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index cea40f4785..db8644ecfe 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -190,7 +190,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, int32_t* capacity, const SColumnInfoData* pSource, uint32_t numOfRow2); int32_t colDataAssign(SColumnInfoData* pColumnInfoData, const SColumnInfoData* pSource, int32_t numOfRows); -int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock); +int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); void colDataTrim(SColumnInfoData* pColumnInfoData); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 736cb98549..6e1a9c5726 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -341,7 +341,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } -int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { +int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { return 0; } @@ -350,7 +350,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { return -1; } - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, 0); + int32_t index = (tsColumnIndex == -1)? 0:tsColumnIndex; + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { return 0; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 2020be54d6..7351339f13 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1903,7 +1903,7 @@ void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock, SArray* pColMatchIn filterFreeInfo(filter); extractQualifiedTupleByFilterResult(pBlock, rowRes, keep); - blockDataUpdateTsWindow(pBlock); + blockDataUpdateTsWindow(pBlock, 0); } void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowRes, bool keep) { @@ -2072,7 +2072,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } qDebug("%s result generated, rows:%d, groupId:%"PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, pBlock->info.groupId); - blockDataUpdateTsWindow(pBlock); + blockDataUpdateTsWindow(pBlock, 0); return 0; } @@ -2797,7 +2797,9 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI } pRes->info.rows = numOfRows; - blockDataUpdateTsWindow(pRes); + + // todo move this to time window aggregator, since the primary timestamp may not be known by exchange operator. + blockDataUpdateTsWindow(pRes, 0); int64_t el = taosGetTimestampUs() - startTs; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 16dcf30f39..7606374cdb 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -553,7 +553,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { pInfo->pageIndex += 1; - blockDataUpdateTsWindow(pInfo->binfo.pRes); + blockDataUpdateTsWindow(pInfo->binfo.pRes, 0); pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId; return pInfo->binfo.pRes; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8b6ab96b6d..4e27b6ac4f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -689,7 +689,7 @@ static SSDataBlock* getUpdateDataBlock(SStreamBlockScanInfo* pInfo, bool inverti } pDataBlock->info.rows = size; pDataBlock->info.type = STREAM_REPROCESS; - blockDataUpdateTsWindow(pDataBlock); + blockDataUpdateTsWindow(pDataBlock, 0); taosArrayClear(pInfo->tsArray); return pDataBlock; } @@ -899,7 +899,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } rows = pBlockInfo->rows; doFilter(pInfo->pCondition, pInfo->pRes, NULL); - blockDataUpdateTsWindow(pInfo->pRes); + blockDataUpdateTsWindow(pInfo->pRes, 0); break; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c3a447c802..8abebf184b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -790,6 +790,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } getTableScanInfo(pOperator, &pInfo->order, &scanFlag); + blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, pInfo->order, scanFlag, true); @@ -938,8 +939,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } - int32_t order = TSDB_ORDER_ASC; - STimeWindow win = pTaskInfo->window; + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { @@ -952,6 +952,8 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { } setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); + blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + doStateWindowAggImpl(pOperator, pInfo, pBlock); } @@ -1429,6 +1431,8 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order, MAIN_SCAN, true); + blockDataUpdateTsWindow(pBlock, pInfo->tsSlotId); + doSessionWindowAggImpl(pOperator, pInfo, pBlock); } diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index c7ec0eece2..880ac6c78e 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -189,7 +189,7 @@ SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { pInfo->current += 1; - blockDataUpdateTsWindow(pBlock); + blockDataUpdateTsWindow(pBlock, 0); return pBlock; }