refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-02-21 09:42:23 +08:00
parent 8d848a5bf0
commit d6f5ae226b
2 changed files with 4 additions and 6 deletions

View File

@ -612,9 +612,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaWLock(pMeta);
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, false);
streamMetaWUnLock(pMeta);
streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true);
// drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);

View File

@ -763,11 +763,11 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
SStreamMeta* pMeta = pTask->pMeta;
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
if (pTask->info.fillHistory == 0) {
return 0;
return TSDB_CODE_SUCCESS;
}
if (metaLock) {
streamMetaWLock(pTask->pMeta);
streamMetaWLock(pMeta);
}
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
@ -788,7 +788,7 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool
}
if (metaLock) {
streamMetaWUnLock(pTask->pMeta);
streamMetaWUnLock(pMeta);
}
return TSDB_CODE_SUCCESS;