enh(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-01 19:22:59 +08:00
parent ad4c5916e7
commit c016a58623
1 changed files with 4 additions and 27 deletions

View File

@ -41,7 +41,7 @@ typedef struct SNodeEntry {
} SNodeEntry;
typedef struct SStreamVnodeRevertIndex {
SHashObj* pVnodeMap;
// SHashObj* pVnodeMap;
SArray* pNodeEntryList;
} SStreamVnodeRevertIndex;
@ -1787,7 +1787,7 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3
mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid);
ASSERT(0);
// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint");
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1938,31 +1938,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// update the related upstream and downstream tasks
taosWLockLatch(&pStream->lock);
int32_t numOfLevels = taosArrayGetSize(pStream->tasks);
for (int32_t j = 0; j < numOfLevels; ++j) {
SArray *pLevel = taosArrayGetP(pStream->tasks, j);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
if (pTask->info.nodeId == nodeId) {
pTask->info.epSet = newEpSet;
continue;
}
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, &newEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, &newEpSet);
}
}
}
updateTaskEpInfo(pStream, req.vgId, &req.epset);
// write down
code = createStreamUpdateTrans(pMnode, pStream, nodeId, &newEpSet);
if (code != TSDB_CODE_SUCCESS) {