refactor:

This commit is contained in:
Haojun Liao 2023-11-10 15:30:21 +08:00
parent 0b36081158
commit d0307e5876
3 changed files with 41 additions and 28 deletions

View File

@ -2610,16 +2610,18 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
for (int k = 0; k < num; ++k) {
int32_t* pVgId = taosArrayGet(pNodeList, k);
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
for (int i = 0; i < numOfNodes; ++i) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
if (pNodeEntry->nodeId == *pVgId) {
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
pNodeEntry->stageUpdated = true;
break;
}
@ -2670,8 +2672,11 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
doExtractTasksFromStream(pMnode);
}
mDebug("%d stream nodes needs updated", (int32_t) taosArrayGetSize(req.pUpdateNodes));
setNodeEpsetExpiredFlag(req.pUpdateNodes);
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) {
mDebug("%d stream nodes needs updated from tasks' report", (int32_t)taosArrayGetSize(req.pUpdateNodes));
setNodeEpsetExpiredFlag(req.pUpdateNodes);
}
for (int32_t i = 0; i < req.numOfTasks; ++i) {
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);

View File

@ -854,6 +854,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
taosArrayDestroy(pIdList);
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
return true;
}
}
return false;
}
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
SStreamMeta* pMeta = pTask->pMeta;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
taosThreadMutexUnlock(&pTask->lock);
}
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
@ -949,30 +980,7 @@ void metaHbToMnode(void* param, void* tmrId) {
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
}
taosThreadMutexLock(&(*pTask)->lock);
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
bool exist = false;
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
exist = true;
break;
}
}
if (!exist) {
taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into the update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(hbMsg.pUpdateNodes));
}
}
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
taosThreadMutexUnlock(&(*pTask)->lock);
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);

View File

@ -1083,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
pStartInfo->readyTs = pTask->execInfo.start;
pStartInfo->readyTs = taosGetTimestampMs();
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64