add mid operator retrive

This commit is contained in:
54liuyao 2024-01-31 18:09:13 +08:00
parent 288b91a4c3
commit 0afed5263e
5 changed files with 66 additions and 23 deletions

View File

@ -171,6 +171,7 @@ typedef enum EStreamType {
STREAM_CHECKPOINT,
STREAM_CREATE_CHILD_TABLE,
STREAM_TRANS_STATE,
STREAM_MID_RETRIEVE,
} EStreamType;
#pragma pack(push, 1)

View File

@ -559,6 +559,7 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) {
end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end;
for(int i = begin; i < end; i++){
SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i);
pUpTask->info.selfChildId = i - begin;
streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask);
streamTaskSetUpstreamInfo(*pDownTask, pUpTask);
}

View File

@ -557,6 +557,7 @@ typedef struct SStreamIntervalOperatorInfo {
SSDataBlock* pCheckpointRes;
struct SUpdateInfo* pUpdateInfo;
bool recvRetrive;
SSDataBlock* pMidRetriveRes;
} SStreamIntervalOperatorInfo;
typedef struct SDataGroupInfo {

View File

@ -490,7 +490,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
int32_t children = 0;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) {
int32_t children = 0;
int32_t children = pHandle->numOfVgroups;
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
int32_t children = pHandle->numOfVgroups;

View File

@ -410,6 +410,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
blockDataDestroy(pInfo->pPullDataRes);
taosArrayDestroy(pInfo->pDelWins);
blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pMidRetriveRes);
pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
if (pInfo->pState->dump == 1) {
@ -1246,8 +1247,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pPullDataRes;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
}
}
}
@ -1324,7 +1325,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
} else if (pBlock->info.type == STREAM_RETRIEVE) {
if(!IS_FINAL_INTERVAL_OP(pOperator)) {
pInfo->recvRetrive = true;
copyDataBlock(pInfo->pPullDataRes, pBlock);
copyDataBlock(pInfo->pMidRetriveRes, pBlock);
pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE;
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
break;
}
@ -1340,6 +1342,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
doStreamIntervalSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue;
} else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) {
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");
}
@ -1375,8 +1379,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->recvRetrive) {
pInfo->recvRetrive = false;
printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pPullDataRes;
printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pMidRetriveRes;
}
return NULL;
@ -1550,6 +1554,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
pInfo->recvGetAll = false;
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvRetrive = false;
pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE);
pOperator->operatorType = pPhyNode->type;
if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) {
@ -4142,16 +4147,15 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info;
pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version);
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
SRowBuffPos* pResPos = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 1;
uint64_t groupId = pSDataBlock->info.id.groupId;
SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo);
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
int32_t numOfOutput = pSup->numOfExprs;
int32_t step = 1;
SRowBuffPos* pResPos = NULL;
SResultRow* pResult = NULL;
int32_t forwardRows = 1;
uint64_t groupId = pSDataBlock->info.id.groupId;
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
TSKEY* tsCol = (int64_t*)pColDataInfo->pData;
@ -4160,6 +4164,27 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval);
while (1) {
SWinKey key = {
.ts = nextWin.skey,
.groupId = groupId,
};
void* chIds = taosHashGet(pInfo->pPullDataMap, &key, sizeof(SWinKey));
int32_t index = -1;
SArray* chArray = NULL;
int32_t chId = 0;
if (chIds) {
chArray = *(void**)chIds;
chId = getChildIndex(pSDataBlock);
index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ);
}
if (!(index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA)) {
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, startPos);
if (startPos < 0) {
break;
}
continue;
}
if (!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) {
startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin);
if (startPos < 0) {
@ -4175,10 +4200,6 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY);
}
SWinKey key = {
.ts = pResult->win.skey,
.groupId = groupId,
};
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) {
saveWinResult(&key, pResPos, pUpdatedMap);
}
@ -4215,6 +4236,18 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS
}
}
static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t numOfChild) {
int32_t size = taosArrayGetSize(wins);
for (int32_t i = 0; i < size; i++) {
SWinKey* winKey = taosArrayGet(wins, i);
void* chIds = taosHashGet(pMidPullMap, winKey, sizeof(SWinKey));
if (!chIds) {
addPullWindow(pMidPullMap, winKey, numOfChild);
qDebug("===stream===prepare mid operator retrive for delete %" PRId64 ", size:%d", winKey->ts, numOfChild);
}
}
}
static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4300,15 +4333,22 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pDelRes;
}
break;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_PULL_OVER) {
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
return pBlock;
} else if (pBlock->info.type == STREAM_PULL_OVER) {
processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins,
pInfo->numOfChild, pOperator);
return pBlock;
} else if (pBlock->info.type == STREAM_CHECKPOINT) {
pAPI->stateStore.streamStateCommit(pInfo->pState);
doStreamIntervalSaveCheckpoint(pOperator);
copyDataBlock(pInfo->pCheckpointRes, pBlock);
continue;
} else if (pBlock->info.type == STREAM_RETRIEVE) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap);
} else if (pBlock->info.type == STREAM_MID_RETRIEVE) {
SArray* delWins = taosArrayInit(8, sizeof(SWinKey));
doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap);
addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild);
taosArrayDestroy(delWins);
continue;
} else {
ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");