mid interval retrive

This commit is contained in:
54liuyao 2024-01-22 10:08:45 +08:00
parent 7246fedfef
commit 288b91a4c3
2 changed files with 31 additions and 9 deletions

View File

@ -556,6 +556,7 @@ typedef struct SStreamIntervalOperatorInfo {
bool reCkBlock; bool reCkBlock;
SSDataBlock* pCheckpointRes; SSDataBlock* pCheckpointRes;
struct SUpdateInfo* pUpdateInfo; struct SUpdateInfo* pUpdateInfo;
bool recvRetrive;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {

View File

@ -1239,11 +1239,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return NULL; return NULL;
} else { } else {
if (!IS_FINAL_INTERVAL_OP(pOperator)) { if (!IS_FINAL_INTERVAL_OP(pOperator)) {
doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (pInfo->pDelRes->info.rows != 0) { if (resBlock != NULL) {
// process the rest of the data return resBlock;
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); }
return pInfo->pDelRes;
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pPullDataRes;
} }
} }
} }
@ -1317,9 +1321,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->recvGetAll = true; pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_INTERVAL_OP(pOperator)) { } else if (pBlock->info.type == STREAM_RETRIEVE) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); if(!IS_FINAL_INTERVAL_OP(pOperator)) {
if (taosArrayGetSize(pInfo->pUpdated) > 0) { pInfo->recvRetrive = true;
copyDataBlock(pInfo->pPullDataRes, pBlock);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
break; break;
} }
continue; continue;
@ -1362,7 +1368,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->pUpdated = NULL; pInfo->pUpdated = NULL;
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
return buildIntervalResult(pOperator); SSDataBlock* resBlock = buildIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pPullDataRes;
}
return NULL;
} }
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) {
@ -1532,6 +1549,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
pInfo->recvGetAll = false; pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -4289,6 +4307,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
doStreamIntervalSaveCheckpoint(pOperator); doStreamIntervalSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock); copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue; continue;
} else if (pBlock->info.type == STREAM_RETRIEVE) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
continue;
} else { } else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
} }