diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2d2f377041..f11b083ad6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1883,7 +1883,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { if (pInfo->pRecoverRes != NULL) { pInfo->blockRecoverContiCnt++; calBlockTbName(pInfo, pInfo->pRecoverRes); - if (pInfo->pUpdateInfo) { + if (!pInfo->igCheckUpdate && pInfo->pUpdateInfo) { if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) { TSKEY maxTs = pAPI->stateStore.updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index aa0aa9799b..2cdf9bb15c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1623,7 +1623,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; - if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { + if (!pScanInfo->pUpdateInfo) { pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } @@ -2150,28 +2150,29 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB } void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SInterval* pInterval) { - SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; - SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); TSKEY* tsEndData = (TSKEY*)pEndCol->pData; SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY winTs = tsData[i]; - while (winTs < tsEndData[i]) { + while (winTs <= tsEndData[i]) { SWinKey winRes = {.ts = winTs, .groupId = groupIdData[i]}; void* chIds = taosHashGet(pMap, &winRes, sizeof(SWinKey)); if (chIds) { SArray* chArray = *(SArray**)chIds; int32_t index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===window %" PRId64 " delete child id %d", winRes.ts, chId); + qDebug("===stream===retrive window %" PRId64 " delete child id %d", winRes.ts, chId); taosArrayRemove(chArray, index); if (taosArrayGetSize(chArray) == 0) { // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); + qDebug("===stream===retrive pull data over.window %" PRId64 , winRes.ts); } } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ba24a581e9..71a21ac150 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1062,7 +1062,7 @@ _end: } int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { - qWarn("try to write to cf parname"); + qDebug("try to write to cf parname"); #ifdef USE_ROCKSDB if (tSimpleHashGetSize(pState->parNameMap) > MAX_TABLE_NAME_NUM) { if (tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t)) == NULL) {