refactor: do some refactor.

This commit is contained in:
Haojun Liao 2023-08-08 09:11:41 +08:00
parent f8ee055641
commit af3a87560a
3 changed files with 31 additions and 30 deletions

View File

@ -623,6 +623,7 @@ int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir); int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskStop(SStreamTask* pTask); int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId); SRpcHandleInfo* pRpcInfo, int32_t taskId);

View File

@ -1845,35 +1845,6 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, int3
return mndPersistTransLog(pStream, pTrans); return mndPersistTransLog(pStream, pTrans);
} }
static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) {
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
pTask->info.epSet = *pEpSet;
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
}
}
return 0;
}
// todo: this process should be executed by the write queue worker of the mnode // todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
@ -1944,7 +1915,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// update the related upstream and downstream tasks, todo remove this, no need this function // update the related upstream and downstream tasks, todo remove this, no need this function
taosWLockLatch(&pStream->lock); taosWLockLatch(&pStream->lock);
updateTaskEpInfo(pStream, req.vgId, &req.epset); streamTaskUpdateEpInfo(pStream->tasks, req.vgId, &req.epset);
streamTaskUpdateEpInfo(pStream->pHTasksList, req.vgId, &req.epset);
taosWUnLockLatch(&pStream->lock); taosWUnLockLatch(&pStream->lock);
code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet); code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet);

View File

@ -481,3 +481,31 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
return 0; return 0;
} }
int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
int32_t numOfLevels = taosArrayGetSize(pTaskList);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pTaskList, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
pTask->info.epSet = *pEpSet;
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
}
}
return 0;
}