Merge pull request #23642 from taosdata/fix/liaohj
fix(stream): set the correct updated nodeId.
This commit is contained in:
commit
3ed5e640cb
|
@ -461,7 +461,7 @@ typedef struct STaskStartInfo {
|
||||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||||
int32_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
} STaskStartInfo;
|
} STaskStartInfo;
|
||||||
|
|
||||||
typedef struct STaskUpdateInfo {
|
typedef struct STaskUpdateInfo {
|
||||||
|
|
|
@ -247,7 +247,7 @@ int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 60;
|
int32_t tsStreamCheckpointInterval = 60;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 30;
|
int32_t tsStreamNodeCheckInterval = 15;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
int32_t tsTtlPushIntervalSec = 10;
|
int32_t tsTtlPushIntervalSec = 10;
|
||||||
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
int32_t tsTrimVDbIntervalSec = 60 * 60; // interval of trimming db in all vgroups
|
||||||
|
|
|
@ -43,7 +43,7 @@ typedef struct SNodeEntry {
|
||||||
} SNodeEntry;
|
} SNodeEntry;
|
||||||
|
|
||||||
typedef struct SStreamExecInfo {
|
typedef struct SStreamExecInfo {
|
||||||
SArray * pNodeEntryList;
|
SArray *pNodeList;
|
||||||
int64_t ts; // snapshot ts
|
int64_t ts; // snapshot ts
|
||||||
int64_t activeCheckpoint; // active check point id
|
int64_t activeCheckpoint; // active check point id
|
||||||
SHashObj * pTaskMap;
|
SHashObj * pTaskMap;
|
||||||
|
@ -850,7 +850,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
mDebug("register to stream task node list");
|
mDebug("stream tasks register into node list");
|
||||||
keepStreamTasksInBuf(&streamObj, &execInfo);
|
keepStreamTasksInBuf(&streamObj, &execInfo);
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
@ -1141,6 +1141,15 @@ static const char *mndGetStreamDB(SMnode *pMnode) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t initStreamNodeList(SMnode* pMnode) {
|
||||||
|
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
|
||||||
|
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
|
||||||
|
execInfo.pNodeList = extractNodeListFromStream(pMnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
return taosArrayGetSize(execInfo.pNodeList);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode * pMnode = pReq->info.node;
|
SMnode * pMnode = pReq->info.node;
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb * pSdb = pMnode->pSdb;
|
||||||
|
@ -1151,22 +1160,18 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
{ // check if the node update happens or not
|
{ // check if the node update happens or not
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
if (execInfo.pNodeEntryList != NULL) {
|
int32_t numOfNodes = initStreamNodeList(pMnode);
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
}
|
|
||||||
|
|
||||||
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
if (numOfNodes == 0) {
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
|
||||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||||
if (pNodeEntry->stageUpdated) {
|
if (pNodeEntry->stageUpdated) {
|
||||||
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1180,7 +1185,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
@ -2095,20 +2100,21 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
SNodeEntry entry = {0};
|
SNodeEntry entry = {.nodeId = pVgroup->vgId, .hbTimestamp = pVgroup->updateTime};
|
||||||
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
entry.epset = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
entry.nodeId = pVgroup->vgId;
|
|
||||||
entry.hbTimestamp = pVgroup->updateTime;
|
|
||||||
|
|
||||||
|
// if not all ready till now, no need to check the remaining vgroups.
|
||||||
if (*allReady) {
|
if (*allReady) {
|
||||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
if (!pVgroup->vnodeGid[i].syncRestore) {
|
if (!pVgroup->vnodeGid[i].syncRestore) {
|
||||||
|
mInfo("vgId:%d not restored, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
ESyncState state = pVgroup->vnodeGid[i].syncState;
|
||||||
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
if (state == TAOS_SYNC_STATE_OFFLINE || state == TAOS_SYNC_STATE_ERROR) {
|
||||||
|
mInfo("vgId:%d offline/err, not ready for checkpoint or other operations", pVgroup->vgId);
|
||||||
*allReady = false;
|
*allReady = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2314,9 +2320,9 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
(int32_t)taosArrayGetSize(execInfo.pTaskList));
|
(int32_t)taosArrayGetSize(execInfo.pTaskList));
|
||||||
|
|
||||||
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
int32_t size = taosArrayGetSize(pNodeSnapshot);
|
||||||
SArray *pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeEntryList); ++i) {
|
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
|
||||||
SNodeEntry *p = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i);
|
||||||
|
|
||||||
for (int32_t j = 0; j < size; ++j) {
|
for (int32_t j = 0; j < size; ++j) {
|
||||||
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
|
||||||
|
@ -2327,8 +2333,8 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
taosArrayDestroy(execInfo.pNodeList);
|
||||||
execInfo.pNodeEntryList = pValidNodeEntryList;
|
execInfo.pNodeList = pValidNodeEntryList;
|
||||||
|
|
||||||
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
|
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
|
||||||
taosArrayDestroy(pRemovedTasks);
|
taosArrayDestroy(pRemovedTasks);
|
||||||
|
@ -2338,6 +2344,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
|
||||||
// this function runs by only one thread, so it is not multi-thread safe
|
// this function runs by only one thread, so it is not multi-thread safe
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
||||||
if (old != 0) {
|
if (old != 0) {
|
||||||
mDebug("still in checking node change");
|
mDebug("still in checking node change");
|
||||||
|
@ -2348,23 +2355,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int64_t ts = taosGetTimestampSec();
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
if (execInfo.pNodeEntryList == NULL || (taosArrayGetSize(execInfo.pNodeEntryList) == 0)) {
|
|
||||||
if (execInfo.pNodeEntryList != NULL) {
|
|
||||||
execInfo.pNodeEntryList = taosArrayDestroy(execInfo.pNodeEntryList);
|
|
||||||
}
|
|
||||||
execInfo.pNodeEntryList = extractNodeListFromStream(pMnode);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (taosArrayGetSize(execInfo.pNodeEntryList) == 0) {
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
int32_t numOfNodes = initStreamNodeList(pMnode);
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
|
if (numOfNodes == 0) {
|
||||||
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
mDebug("end to do stream task node change checking, no vgroup exists, do nothing");
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool allVnodeReady = true;
|
bool allVgroupsReady = true;
|
||||||
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVnodeReady);
|
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allVgroupsReady);
|
||||||
if (!allVnodeReady) {
|
if (!allVgroupsReady) {
|
||||||
taosArrayDestroy(pNodeSnapshot);
|
taosArrayDestroy(pNodeSnapshot);
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
|
mWarn("not all vnodes are ready, ignore the exec nodeUpdate check");
|
||||||
|
@ -2374,7 +2379,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
removeExpirednodeEntryAndTask(pNodeSnapshot);
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
||||||
doKillActiveCheckpointTrans(pMnode);
|
doKillActiveCheckpointTrans(pMnode);
|
||||||
|
@ -2383,8 +2388,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
// keep the new vnode snapshot
|
// keep the new vnode snapshot
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mDebug("create trans successfully, update cached node list");
|
mDebug("create trans successfully, update cached node list");
|
||||||
taosArrayDestroy(execInfo.pNodeEntryList);
|
taosArrayDestroy(execInfo.pNodeList);
|
||||||
execInfo.pNodeEntryList = pNodeSnapshot;
|
execInfo.pNodeList = pNodeSnapshot;
|
||||||
execInfo.ts = ts;
|
execInfo.ts = ts;
|
||||||
} else {
|
} else {
|
||||||
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
|
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code));
|
||||||
|
@ -2623,16 +2628,18 @@ int32_t mndResetFromCheckpoint(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
int32_t num = taosArrayGetSize(pNodeList);
|
int32_t num = taosArrayGetSize(pNodeList);
|
||||||
|
mInfo("set node expired for %d nodes", num);
|
||||||
|
|
||||||
for (int k = 0; k < num; ++k) {
|
for (int k = 0; k < num; ++k) {
|
||||||
int32_t *pVgId = taosArrayGet(pNodeList, k);
|
int32_t* pVgId = taosArrayGet(pNodeList, k);
|
||||||
|
mInfo("set node expired for nodeId:%d, total:%d", *pVgId, num);
|
||||||
|
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
for (int i = 0; i < numOfNodes; ++i) {
|
for (int i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, i);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i);
|
||||||
|
|
||||||
if (pNodeEntry->nodeId == *pVgId) {
|
if (pNodeEntry->nodeId == *pVgId) {
|
||||||
mInfo("vgId:%d expired in stream task, needs update nodeEp", *pVgId);
|
mInfo("vgId:%d expired for some stream tasks, needs update nodeEp", *pVgId);
|
||||||
pNodeEntry->stageUpdated = true;
|
pNodeEntry->stageUpdated = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2642,10 +2649,10 @@ int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int32_t stage) {
|
static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeEntryList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
for (int32_t j = 0; j < numOfNodes; ++j) {
|
for(int32_t j = 0; j < numOfNodes; ++j) {
|
||||||
SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeEntryList, j);
|
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
|
||||||
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
|
||||||
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
|
||||||
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
|
||||||
|
@ -2677,12 +2684,20 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
|
||||||
|
|
||||||
taosThreadMutexLock(&execInfo.lock);
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
// extract stream task list
|
||||||
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
int32_t numOfExisted = taosHashGetSize(execInfo.pTaskMap);
|
||||||
if (numOfExisted == 0) {
|
if (numOfExisted == 0) {
|
||||||
doExtractTasksFromStream(pMnode);
|
doExtractTasksFromStream(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
initStreamNodeList(pMnode);
|
||||||
|
|
||||||
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
|
if (numOfUpdated > 0) {
|
||||||
|
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
|
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
for (int32_t i = 0; i < req.numOfTasks; ++i) {
|
||||||
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
STaskStatusEntry *p = taosArrayGet(req.pTaskStatus, i);
|
||||||
|
|
|
@ -854,6 +854,37 @@ static void clearHbMsg(SStreamHbMsg* pMsg, SArray* pIdList) {
|
||||||
taosArrayDestroy(pIdList);
|
taosArrayDestroy(pIdList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
|
||||||
|
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
|
||||||
|
for (int k = 0; k < numOfExisted; ++k) {
|
||||||
|
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
|
||||||
|
SStreamMeta* pMeta = pTask->pMeta;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
||||||
|
int32_t num = taosArrayGetSize(pTask->outputInfo.pDownstreamUpdateList);
|
||||||
|
for (int j = 0; j < num; ++j) {
|
||||||
|
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pDownstreamUpdateList, j);
|
||||||
|
|
||||||
|
bool exist = existInHbMsg(pMsg, pTaskEpset);
|
||||||
|
if (!exist) {
|
||||||
|
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
|
||||||
|
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
|
||||||
|
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayClear(pTask->outputInfo.pDownstreamUpdateList);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
void metaHbToMnode(void* param, void* tmrId) {
|
void metaHbToMnode(void* param, void* tmrId) {
|
||||||
int64_t rid = *(int64_t*)param;
|
int64_t rid = *(int64_t*)param;
|
||||||
|
|
||||||
|
@ -949,28 +980,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verStart, &entry.verEnd);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&(*pTask)->lock);
|
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
|
||||||
int32_t num = taosArrayGetSize((*pTask)->outputInfo.pDownstreamUpdateList);
|
|
||||||
for (int j = 0; j < num; ++j) {
|
|
||||||
int32_t* pNodeId = taosArrayGet((*pTask)->outputInfo.pDownstreamUpdateList, j);
|
|
||||||
|
|
||||||
bool exist = false;
|
|
||||||
int32_t numOfExisted = taosArrayGetSize(hbMsg.pUpdateNodes);
|
|
||||||
for (int k = 0; k < numOfExisted; ++k) {
|
|
||||||
if (*pNodeId == *(int32_t*)taosArrayGet(hbMsg.pUpdateNodes, k)) {
|
|
||||||
exist = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!exist) {
|
|
||||||
taosArrayPush(hbMsg.pUpdateNodes, pNodeId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayClear((*pTask)->outputInfo.pDownstreamUpdateList);
|
|
||||||
taosThreadMutexUnlock(&(*pTask)->lock);
|
|
||||||
|
|
||||||
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
taosArrayPush(hbMsg.pTaskStatus, &entry);
|
||||||
if (!hasMnodeEpset) {
|
if (!hasMnodeEpset) {
|
||||||
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
|
||||||
|
@ -1010,7 +1020,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
pMeta->pHbInfo->hbCount += 1;
|
pMeta->pHbInfo->hbCount += 1;
|
||||||
|
|
||||||
stDebug("vgId:%d, build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
|
||||||
pMeta->pHbInfo->hbCount);
|
pMeta->pHbInfo->hbCount);
|
||||||
tmsgSendReq(&epset, &msg);
|
tmsgSendReq(&epset, &msg);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -1083,7 +1083,7 @@ int32_t streamMetaUpdateTaskDownstreamStatus(SStreamTask* pTask, int64_t startTs
|
||||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
|
||||||
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
if (taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet) == numOfTotal) {
|
||||||
pStartInfo->readyTs = pTask->execInfo.start;
|
pStartInfo->readyTs = taosGetTimestampMs();
|
||||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||||
|
|
||||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:%s level:%d, startTs:%" PRId64
|
||||||
|
|
Loading…
Reference in New Issue