Merge pull request #28068 from taosdata/fix/TD-32265

fix(stream):rebuild stream event window
This commit is contained in:
Haojun Liao 2024-09-25 10:49:11 +08:00 committed by GitHub
commit c2ff609c5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 4 additions and 4 deletions

View File

@ -3485,7 +3485,7 @@ FETCH_NEXT_BLOCK:
return code; return code;
} }
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
lino); __LINE__);
blockDataCleanup(pInfo->pUpdateDataRes); blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;
@ -3499,7 +3499,7 @@ FETCH_NEXT_BLOCK:
return code; return code;
} }
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
lino); __LINE__);
blockDataCleanup(pInfo->pUpdateDataRes); blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;

View File

@ -179,7 +179,7 @@ _end:
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
} }
pAggSup->stateStore.streamStateFreeCur(pCur); pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, qDebug("===stream===set event cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
pCurWin->winInfo.sessionWin.win.ekey); pCurWin->winInfo.sessionWin.win.ekey);
_error: _error:
@ -233,7 +233,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
pWinInfo->pWinFlag->endFlag = ends[i]; pWinInfo->pWinFlag->endFlag = ends[i];
} else if (pWin->ekey == pTsData[i]) { } else if (pWin->ekey == pTsData[i]) {
pWinInfo->pWinFlag->endFlag |= ends[i]; pWinInfo->pWinFlag->endFlag |= ends[i];
} else { } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
*pRebuild = true; *pRebuild = true;
pWinInfo->pWinFlag->endFlag |= ends[i]; pWinInfo->pWinFlag->endFlag |= ends[i];
(*pWinRow) = i + 1 - start; (*pWinRow) = i + 1 - start;