fix(stream):rebuild stream event window

This commit is contained in:
54liuyao 2024-09-24 14:33:53 +08:00
parent 5639fd0baf
commit 8eff35cd77
2 changed files with 4 additions and 4 deletions

View File

@ -3482,7 +3482,7 @@ FETCH_NEXT_BLOCK:
return code; return code;
} }
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
lino); __LINE__);
blockDataCleanup(pInfo->pUpdateDataRes); blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;
@ -3496,7 +3496,7 @@ FETCH_NEXT_BLOCK:
return code; return code;
} }
qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__, qError("%s===stream=== %s failed at line %d since pInfo->pUpdateRes is empty", GET_TASKID(pTaskInfo), __func__,
lino); __LINE__);
blockDataCleanup(pInfo->pUpdateDataRes); blockDataCleanup(pInfo->pUpdateDataRes);
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
} break; } break;

View File

@ -176,7 +176,7 @@ _end:
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin);
} }
pAggSup->stateStore.streamStateFreeCur(pCur); pAggSup->stateStore.streamStateFreeCur(pCur);
qDebug("===stream===set event next win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey, qDebug("===stream===set event cur win buff. skey:%" PRId64 ", endkey:%" PRId64, pCurWin->winInfo.sessionWin.win.skey,
pCurWin->winInfo.sessionWin.win.ekey); pCurWin->winInfo.sessionWin.win.ekey);
_error: _error:
@ -230,7 +230,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW
pWinInfo->pWinFlag->endFlag = ends[i]; pWinInfo->pWinFlag->endFlag = ends[i];
} else if (pWin->ekey == pTsData[i]) { } else if (pWin->ekey == pTsData[i]) {
pWinInfo->pWinFlag->endFlag |= ends[i]; pWinInfo->pWinFlag->endFlag |= ends[i];
} else { } else if (ends[i] && !pWinInfo->pWinFlag->endFlag) {
*pRebuild = true; *pRebuild = true;
pWinInfo->pWinFlag->endFlag |= ends[i]; pWinInfo->pWinFlag->endFlag |= ends[i];
(*pWinRow) = i + 1 - start; (*pWinRow) = i + 1 - start;