fix(stream): add fill-history task id in msg.

This commit is contained in:
Haojun Liao 2024-06-12 15:27:19 +08:00
parent dc2497791a
commit d06e1549f1
4 changed files with 8 additions and 8 deletions

View File

@ -3472,6 +3472,8 @@ typedef struct SVUpdateCheckpointInfoReq {
int64_t checkpointVer;
int64_t checkpointTs;
int32_t transId;
int64_t hStreamId; // add encode/decode
int64_t hTaskId;
int8_t dropRelHTask;
} SVUpdateCheckpointInfoReq;

View File

@ -716,6 +716,8 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
pReq->checkpointTs = pInfo->ts;
pReq->dropRelHTask = pInfo->dropHTask;
pReq->transId = pInfo->transId;
pReq->hStreamId = pTask->hTaskInfo.id.streamId;
pReq->hTaskId = pTask->hTaskInfo.id.taskId;
}
}

View File

@ -646,8 +646,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
}
streamMetaWUnLock(pMeta);
// tqStreamRemoveTaskBackend(pMeta, &id);
return 0;
}

View File

@ -418,7 +418,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
int32_t code = 0;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
STaskId hTaskId = {0};
taosThreadMutexLock(&pTask->lock);
@ -434,7 +433,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId);
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks);
@ -476,9 +475,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
}
if (pReq->dropRelHTask) {
hTaskId = pTask->hTaskInfo.id;
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, hTaskId.taskId);
pReq->taskId, vgId, pReq->hTaskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
}
@ -499,10 +497,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
(int32_t)hTaskId.taskId, numOfTasks);
(int32_t)pReq->hTaskId, numOfTasks);
}
streamMetaWLock(pMeta);