fix(stream): fix invalid read.

This commit is contained in:
Haojun Liao 2023-09-17 13:59:06 +08:00
parent 0459a4628c
commit 9d6199adf7
1 changed files with 10 additions and 10 deletions

View File

@ -533,6 +533,16 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
taosWLockLatch(&pMeta->lock);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
// it is an fill-history task, remove the related stream task's id that points to it
if ((*ppTask)->info.fillHistory == 1) {
STaskId streamTaskId = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &streamTaskId, sizeof(streamTaskId));
if (ppStreamTask != NULL) {
(*ppStreamTask)->historyTaskId.taskId = 0;
(*ppStreamTask)->historyTaskId.streamId = 0;
}
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__DROPPING);
@ -546,16 +556,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
streamMetaReleaseTask(pMeta, pTask);
}
// it is an fill-history task, remove the related stream task's id that points to it
if ((*ppTask)->info.fillHistory == 1) {
STaskId id1 = {.streamId = (*ppTask)->streamTaskId.streamId, .taskId = (*ppTask)->streamTaskId.taskId};
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id1, sizeof(id1));
if (ppStreamTask != NULL) {
(*ppStreamTask)->historyTaskId.taskId = 0;
(*ppStreamTask)->historyTaskId.streamId = 0;
}
}
streamMetaRemoveTask(pMeta, &id);
streamMetaReleaseTask(pMeta, pTask);
} else {