Merge branch 'fix/newCheckpoint' into enh/triggerCheckPoint2
This commit is contained in:
commit
2834c4f56c
|
@ -2819,10 +2819,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
doStreamIntervalSaveCheckpoint(pOperator);
|
doStreamIntervalSaveCheckpoint(pOperator);
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
|
||||||
qDebug("===stream===return data:%s. recv datablock num:%" PRIu64,
|
|
||||||
IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack);
|
|
||||||
pInfo->numOfDatapack = 0;
|
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
|
@ -4248,7 +4244,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
|
||||||
doStreamSessionSaveCheckpoint(pOperator);
|
doStreamSessionSaveCheckpoint(pOperator);
|
||||||
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
pAggSup->stateStore.streamStateCommit(pAggSup->pState);
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
|
@ -5539,8 +5534,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
pAPI->stateStore.streamStateCommit(pInfo->pState);
|
||||||
pInfo->reCkBlock = true;
|
pInfo->reCkBlock = true;
|
||||||
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
copyDataBlock(pInfo->pCheckpointRes, pBlock);
|
||||||
qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack);
|
|
||||||
pInfo->numOfDatapack = 0;
|
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
|
||||||
|
|
Loading…
Reference in New Issue