fix(stream): set the correct updated nodeId.
This commit is contained in:
parent
233777b6ac
commit
1f9a58361d
|
@ -952,19 +952,20 @@ void metaHbToMnode(void* param, void* tmrId) {
|
|||
taosThreadMutexLock(&(*pTask)->lock);
|
||||
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
||||
for (int j = 0; j < num; ++j) {
|
||||
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
||||
SDownstreamTaskEpset* pTaskEpset = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
||||
|
||||
bool exist = false;
|
||||
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
||||
for (int k = 0; k < numOfExisted; ++k) {
|
||||
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
||||
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
||||
exist = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!exist) {
|
||||
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
|
||||
taosArrayPush(hbMsg.pUpdateNodes, &pTaskEpset->nodeId);
|
||||
stDebug("vgId:%d nodeId:%d added into the update list", pMeta->vgId, pTaskEpset->nodeId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue