fix(stream): handle disorder node Update trans Id.
This commit is contained in:
parent
59a3e8ca40
commit
77961ea791
|
@ -271,7 +271,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch
|
||||||
int32_t tsTransPullupInterval = 2;
|
int32_t tsTransPullupInterval = 2;
|
||||||
int32_t tsCompactPullupInterval = 10;
|
int32_t tsCompactPullupInterval = 10;
|
||||||
int32_t tsMqRebalanceInterval = 2;
|
int32_t tsMqRebalanceInterval = 2;
|
||||||
int32_t tsStreamCheckpointInterval = 60;
|
int32_t tsStreamCheckpointInterval = 300;
|
||||||
float tsSinkDataRate = 2.0;
|
float tsSinkDataRate = 2.0;
|
||||||
int32_t tsStreamNodeCheckInterval = 16;
|
int32_t tsStreamNodeCheckInterval = 16;
|
||||||
int32_t tsTtlUnit = 86400;
|
int32_t tsTtlUnit = 86400;
|
||||||
|
|
|
@ -195,13 +195,22 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
const char* idstr = pTask->id.idStr;
|
const char* idstr = pTask->id.idStr;
|
||||||
|
|
||||||
if (pMeta->updateInfo.transId != req.transId) {
|
if (pMeta->updateInfo.transId != req.transId) {
|
||||||
ASSERT(req.transId > pMeta->updateInfo.transId);
|
if (req.transId < pMeta->updateInfo.transId) {
|
||||||
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
|
||||||
vgId, req.transId, pMeta->updateInfo.transId);
|
pMeta->updateInfo.transId, req.transId);
|
||||||
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
taosArrayDestroy(req.pNodeList);
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
return rsp.code;
|
||||||
pMeta->updateInfo.transId = req.transId;
|
} else {
|
||||||
|
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
||||||
|
vgId, req.transId, pMeta->updateInfo.transId);
|
||||||
|
|
||||||
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
|
taosHashClear(pMeta->updateInfo.pTasks);
|
||||||
|
pMeta->updateInfo.transId = req.transId;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -432,7 +432,6 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||||
ASSERT(left > 0);
|
ASSERT(left > 0);
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
|
|
Loading…
Reference in New Issue