Merge pull request #24811 from taosdata/fix/TD-28627

mid agg operator
This commit is contained in:
Haojun Liao 2024-03-15 12:46:54 +08:00 committed by GitHub
commit f4d01bcab6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 60 additions and 26 deletions

View File

@ -268,7 +268,7 @@ bool tsDisableStream = false;
int64_t tsStreamBufferSize = 128 * 1024 * 1024;
bool tsFilterScalarMode = false;
int tsResolveFQDNRetryTime = 100; // seconds
int tsStreamAggCnt = 1000;
int tsStreamAggCnt = 100000;
char tsS3Endpoint[TSDB_FQDN_LEN] = "<endpoint>";
char tsS3AccessKey[TSDB_FQDN_LEN] = "<accesskey>";

View File

@ -602,6 +602,8 @@ typedef struct SStreamIntervalOperatorInfo {
bool recvPullover;
SSDataBlock* pMidPulloverRes;
bool clearState;
SArray* pMidPullDatas;
int32_t midDelIndex;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {

View File

@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId)
static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; }
static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins,
SSHashObj* pUpdatedMap) {
SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData;
@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa
void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey));
if (chIds) {
int32_t childId = getChildIndex(pBlock);
if (pInvalidWins) {
qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId);
taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0);
}
SArray* chArray = *(void**)chIds;
int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ);
if (index != -1) {
qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId);
qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId);
getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
continue;
}
@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
blockDataDestroy(pInfo->pMidRetriveRes);
blockDataDestroy(pInfo->pMidPulloverRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
taosArrayDestroy(pInfo->pMidPullDatas);
if (pInfo->pState->dump == 1) {
taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner);
@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina
.calWin.skey = nextWin.skey,
.calWin.ekey = nextWin.skey};
// add pull data request
if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh);
if (IS_MID_INTERVAL_OP(pOperator)) {
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
taosArrayPush(pInfo->pMidPullDatas, &winRes);
} else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) {
addPullWindow(pMap, &winRes, numOfCh);
qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh);
}
}
}
@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) {
return pInfo->binfo.pRes;
}
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
return NULL;
}
@ -1293,7 +1297,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
if (IS_FINAL_INTERVAL_OP(pOperator)) {
int32_t chId = getChildIndex(pBlock);
addRetriveWindow(delWins, pInfo, chId);
@ -1329,7 +1333,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
pInfo->recvRetrive = true;
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL);
break;
}
continue;
@ -1557,8 +1561,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->recvPullover = false;
pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pInfo->clearState = false;
pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey));
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -3966,7 +3972,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
@ -4259,6 +4265,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num
}
}
static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
uint16_t opType = pOperator->operatorType;
if (pInfo->recvPullover) {
pInfo->recvPullover = false;
printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidPulloverRes;
}
qDebug("===stream=== build mid interval result");
doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes);
if (pInfo->pDelRes->info.rows != 0) {
// process the rest of the data
printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
return pInfo->pDelRes;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
}
return NULL;
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4287,10 +4321,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {
@ -4330,7 +4363,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
pBlock->info.type == STREAM_CLEAR) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap);
removeResults(delWins, pInfo->pUpdatedMap);
taosArrayAddAll(pInfo->pDelWins, delWins);
taosArrayDestroy(delWins);
@ -4366,7 +4399,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
continue;
} else if (pBlock->info.type == STREAM_MID_RETRIEVE) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL);
addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild);
taosArrayDestroy(delWins);
pInfo->recvRetrive = true;
@ -4411,10 +4444,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
resBlock = buildMidIntervalResult(pOperator);
if (resBlock != NULL) {
return resBlock;
}
if (pInfo->clearState) {

View File

@ -274,7 +274,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0);
$loop_count = 0
loop3:
loop4:
$loop_count = $loop_count + 1
if $loop_count == 20 then
@ -291,7 +291,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 2 select * from streamt5;
@ -302,7 +302,7 @@ if $rows != 1 then
print $data00 $data01 $data02
print $data10 $data11 $data12
print $data20 $data21 $data22
goto loop3
goto loop4
endi
print 3 select * from streamt3;