mid agg operator

This commit is contained in:
54liuyao 2024-02-21 14:48:16 +08:00
parent db30163b17
commit 53a4294e4c
3 changed files with 58 additions and 25 deletions

View File

@ -600,6 +600,8 @@ typedef struct SStreamIntervalOperatorInfo {
bool recvPullover;
SSDataBlock* pMidPulloverRes;
bool clearState;
SArray* pMidPullDatas;
int32_t midDelIndex;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {

View File

@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap) {
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (chIds) {
int32_t childId = getChildIndex(pBlock);
if (pInvalidWins) {
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
}
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId);
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId);
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
}
@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
blockDataDestroy(pInfo->pMidRetriveRes);
blockDataDestroy(pInfo->pMidPulloverRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
taosArrayDestroy(pInfo->pMidPullDatas);
if (pInfo->pState->dump == 1) {
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
if (IS_MID_INTERVAL_OP(pOperator)) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
taosArrayPush(pInfo->pMidPullDatas, &winRes);
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pMap, &winRes, numOfCh);
qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh);
}
}
}
@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
return NULL;
}
@ -1301,7 +1305,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
if (IS_FINAL_INTERVAL_OP(pOperator)) {
int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId);
@ -1337,7 +1341,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->recvRetrive = true;
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL);
break;
}
continue;
@ -1567,6 +1571,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -3973,7 +3978,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
@ -4266,6 +4271,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num
}
}
static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
qDebug("===stream=== build mid interval result");
doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
}
return NULL;
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4294,10 +4327,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {
@ -4345,7 +4377,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap);
removeResults(delWins, pInfo->pUpdatedMap);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
@ -4381,7 +4413,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_MID_RETRIEVE) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild);
taosArrayDestroy(delWins);
pInfo->recvRetrive = true;
@ -4426,10 +4458,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {

View File

@ -259,7 +259,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop3:
loop4:
$loop_count = $loop_count + 1
if $loop_count == 20 then
@ -276,7 +276,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 2 select * from streamt5;
@ -287,7 +287,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 3 select * from streamt3;