refactor(stream): track the msgId for each upstream tasks.
This commit is contained in:
parent
3223cc0287
commit
737138856c
|
@ -41,6 +41,7 @@ typedef struct SStreamUpstreamEpInfo {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
bool dataAllowed; // denote if the data from this upstream task is allowed to put into inputQ, not serialize it
|
||||||
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
int64_t stage; // upstream task stage value, to denote if the upstream node has restart/replica changed/transfer
|
||||||
|
int64_t lastMsgId;
|
||||||
} SStreamUpstreamEpInfo;
|
} SStreamUpstreamEpInfo;
|
||||||
|
|
||||||
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo);
|
||||||
|
|
|
@ -1842,6 +1842,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s lastMsgId:%"PRId64 " for upstream taskId:0x%x(vgId:%d)", id, pInfo->lastMsgId, pReq->upstreamTaskId,
|
||||||
|
pReq->upstreamNodeId);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
if (pMeta->role == NODE_ROLE_FOLLOWER) {
|
||||||
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
|
||||||
status = TASK_INPUT_STATUS__REFUSED;
|
status = TASK_INPUT_STATUS__REFUSED;
|
||||||
|
@ -1866,7 +1869,14 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
stDebug("s-task:%s recv trans-state msgId:%d from upstream:0x%x", id, pReq->msgId, pReq->upstreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
if (pReq->msgId > pInfo->lastMsgId) {
|
||||||
|
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||||
|
pInfo->lastMsgId = pReq->msgId;
|
||||||
|
} else {
|
||||||
|
stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id,
|
||||||
|
pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId);
|
||||||
|
status = TASK_INPUT_STATUS__NORMAL; // still return success
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -100,6 +100,7 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
|
||||||
pEpInfo->nodeId = pTask->info.nodeId;
|
pEpInfo->nodeId = pTask->info.nodeId;
|
||||||
pEpInfo->taskId = pTask->id.taskId;
|
pEpInfo->taskId = pTask->id.taskId;
|
||||||
pEpInfo->stage = -1;
|
pEpInfo->stage = -1;
|
||||||
|
pEpInfo->lastMsgId = -1;
|
||||||
|
|
||||||
return pEpInfo;
|
return pEpInfo;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue