fix(stream): add check for checkpointId in retrieve-checkpoint id msg.

This commit is contained in:
Haojun Liao 2024-08-04 11:37:23 +08:00
parent 02b59d0b33
commit 170a074de8
1 changed files with 6 additions and 1 deletions

View File

@ -989,7 +989,12 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
ASSERT(checkpointId == pReq->checkpointId);
if (checkpointId != pReq->checkpointId) {
tqError("s-task:%s invalid checkpoint-trigger retrieve msg from %x, current checkpointId:%"PRId64" req:%"PRId64,
pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_INVALID_MSG;
}
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task