mid pull over

This commit is contained in:
54liuyao 2024-02-07 11:30:13 +08:00
parent 4f2fb15a56
commit 5473c22585
2 changed files with 20 additions and 3 deletions

View File

@ -559,6 +559,8 @@ typedef struct SStreamIntervalOperatorInfo {
struct SUpdateInfo* pUpdateInfo; struct SUpdateInfo* pUpdateInfo;
bool recvRetrive; bool recvRetrive;
SSDataBlock* pMidRetriveRes; SSDataBlock* pMidRetriveRes;
bool recvPullover;
SSDataBlock* pMidPulloverRes;
} SStreamIntervalOperatorInfo; } SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo { typedef struct SDataGroupInfo {

View File

@ -411,6 +411,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
taosArrayDestroy(pInfo->pDelWins); taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidRetriveRes);
blockDataDestroy(pInfo->pMidPulloverRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
if (pInfo->pState->dump == 1) { if (pInfo->pState->dump == 1) {
@ -603,7 +604,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
} }
void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins,
int32_t numOfCh, SOperatorInfo* pOperator) { int32_t numOfCh, SOperatorInfo* pOperator) {
SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
TSKEY* tsData = (TSKEY*)pStartCol->pData; TSKEY* tsData = (TSKEY*)pStartCol->pData;
@ -612,6 +613,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; uint64_t* groupIdData = (uint64_t*)pGroupCol->pData;
int32_t chId = getChildIndex(pBlock); int32_t chId = getChildIndex(pBlock);
bool res = false;
for (int32_t i = 0; i < pBlock->info.rows; i++) { for (int32_t i = 0; i < pBlock->info.rows; i++) {
TSKEY winTs = tsData[i]; TSKEY winTs = tsData[i];
while (winTs <= tsEndData[i]) { while (winTs <= tsEndData[i]) {
@ -627,6 +629,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
// pull data is over // pull data is over
taosArrayDestroy(chArray); taosArrayDestroy(chArray);
taosHashRemove(pMap, &winRes, sizeof(SWinKey)); taosHashRemove(pMap, &winRes, sizeof(SWinKey));
res =true;
qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts);
void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey));
@ -650,6 +653,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S
winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
} }
} }
return res;
} }
static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) { static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) {
@ -1186,6 +1190,12 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->binfo.pRes; return pInfo->binfo.pRes;
} }
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
return NULL; return NULL;
} }
@ -1555,6 +1565,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false; pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pOperator->operatorType = pPhyNode->type; pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -4336,8 +4347,12 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock; return pBlock;
} else if (pBlock->info.type == STREAM_PULL_OVER) { } else if (pBlock->info.type == STREAM_PULL_OVER) {
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->recvPullover = processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval,
pInfo->numOfChild, pOperator); pInfo->pPullWins, pInfo->numOfChild, pOperator);
if (pInfo->recvPullover) {
copyDataBlock(pInfo->pMidPulloverRes, pBlock);
break;
}
continue; continue;
} else if (pBlock->info.type == STREAM_CHECKPOINT) { } else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAPI->stateStore.streamStateCommit(pInfo->pState); pAPI->stateStore.streamStateCommit(pInfo->pState);