adjust history task trigger mode
This commit is contained in:
parent
a72c11c8de
commit
5d21f50340
|
@ -3737,7 +3737,18 @@ void streamSessionReloadState(SOperatorInfo* pOperator) {
|
|||
int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true);
|
||||
if (winNum > 0) {
|
||||
saveSessionOutputBuf(pAggSup, &winInfo);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(winInfo, pInfo->pStUpdated);
|
||||
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
if (!isCloseWindow(&winInfo.sessionWin.win, &pInfo->twAggSup)) {
|
||||
saveDeleteRes(pInfo->pStDeleted, winInfo.sessionWin);
|
||||
}
|
||||
SSessionKey key = {0};
|
||||
getSessionHashKey(&winInfo.sessionWin, &key);
|
||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
} else {
|
||||
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore);
|
||||
}
|
||||
}
|
||||
taosMemoryFree(pBuf);
|
||||
|
@ -4388,7 +4399,16 @@ void streamStateReloadState(SOperatorInfo* pOperator) {
|
|||
if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) {
|
||||
compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated);
|
||||
saveSessionOutputBuf(pAggSup, &curInfo.winInfo);
|
||||
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
|
||||
saveResult(curInfo.winInfo, pInfo->pSeUpdated);
|
||||
} else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
|
||||
if (!isCloseWindow(&curInfo.winInfo.sessionWin.win, &pInfo->twAggSup)) {
|
||||
saveDeleteRes(pInfo->pSeDeleted, curInfo.winInfo.sessionWin);
|
||||
}
|
||||
SSessionKey key = {0};
|
||||
getSessionHashKey(&curInfo.winInfo.sessionWin, &key);
|
||||
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curInfo.winInfo, sizeof(SResultWindowInfo));
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_VALID_SESSION_WIN(curInfo.winInfo)) {
|
||||
|
|
|
@ -62,7 +62,9 @@ const char* streamGetTaskStatusStr(int32_t status) {
|
|||
|
||||
static int32_t doLaunchScanHistoryTask(SStreamTask* pTask) {
|
||||
SVersionRange* pRange = &pTask->dataRange.range;
|
||||
if (pTask->info.fillHistory) {
|
||||
streamSetParamForScanHistory(pTask);
|
||||
}
|
||||
streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
|
||||
|
||||
int32_t code = streamStartRecoverTask(pTask, 0);
|
||||
|
@ -80,7 +82,9 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
|||
walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||
}
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||
if (pTask->info.fillHistory) {
|
||||
streamSetParamForScanHistory(pTask);
|
||||
}
|
||||
streamTaskScanHistoryPrepare(pTask);
|
||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||
qDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
|
||||
|
@ -434,7 +438,7 @@ int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
|
|||
|
||||
int32_t streamAggUpstreamScanHistoryFinish(SStreamTask* pTask) {
|
||||
void* exec = pTask->exec.pExecutor;
|
||||
if (qRestoreStreamOperatorOption(exec) < 0) {
|
||||
if (pTask->info.fillHistory && qRestoreStreamOperatorOption(exec) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -625,10 +629,13 @@ int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// restore param
|
||||
int32_t code = streamRestoreParam(pTask);
|
||||
int32_t code = 0;
|
||||
if (pTask->info.fillHistory) {
|
||||
code = streamRestoreParam(pTask);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// dispatch recover finish req to all related downstream task
|
||||
code = streamDispatchScanHistoryFinishMsg(pTask);
|
||||
|
|
Loading…
Reference in New Issue