fix(stream): set initial update ep transId.
This commit is contained in:
parent
025437df0c
commit
f6e32529fc
|
@ -190,7 +190,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
const char* idstr = pTask->id.idStr;
|
const char* idstr = pTask->id.idStr;
|
||||||
|
|
||||||
if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) {
|
if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
|
streamMetaInitUpdateTaskList(pMeta, req.transId);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMeta->updateInfo.transId != req.transId) {
|
||||||
if (req.transId < pMeta->updateInfo.transId) {
|
if (req.transId < pMeta->updateInfo.transId) {
|
||||||
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
|
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
|
||||||
pMeta->updateInfo.transId, req.transId);
|
pMeta->updateInfo.transId, req.transId);
|
||||||
|
@ -206,7 +210,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
streamMetaInitUpdateTaskList(pMeta, req.transId);
|
streamMetaInitUpdateTaskList(pMeta, req.transId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr,
|
||||||
|
vgId, req.transId, pMeta->updateInfo.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// duplicate update epset msg received, discard this redundant message
|
// duplicate update epset msg received, discard this redundant message
|
||||||
|
@ -1161,7 +1166,9 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
|
||||||
pTask->chkInfo.checkpointId, req.checkpointId);
|
pTask->chkInfo.checkpointId, req.checkpointId);
|
||||||
pTask->chkInfo.checkpointId = req.checkpointId;
|
pTask->chkInfo.checkpointId = req.checkpointId;
|
||||||
|
tqSetRestoreVersionInfo(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
Loading…
Reference in New Issue