refactor: adjust the lock procedure for unregister stream tasks.

This commit is contained in:
Haojun Liao 2024-12-06 09:20:39 +08:00
parent 406e877700
commit 43ad35f911
4 changed files with 2 additions and 16 deletions

View File

@ -254,11 +254,12 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask
} }
static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) {
streamMetaWLock(pMeta);
int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId); int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId);
if (code != 0) { if (code != 0) {
smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code)); smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code));
} }
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
if (streamMetaCommit(pMeta) < 0) { if (streamMetaCommit(pMeta) < 0) {
// persist to disk // persist to disk

View File

@ -718,8 +718,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
} }
streamMetaWUnLock(pMeta);
// drop the related fill-history task firstly // drop the related fill-history task firstly
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
@ -737,7 +735,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
} }
// commit the update // commit the update
streamMetaWLock(pMeta);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks);

View File

@ -591,7 +591,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
{ // destroy the related fill-history tasks { // destroy the related fill-history tasks
// drop task should not in the meta-lock, and drop the related fill-history task now // drop task should not in the meta-lock, and drop the related fill-history task now
streamMetaWUnLock(pMeta);
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
@ -599,7 +598,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
id, vgId, pReq->taskId, numOfTasks); id, vgId, pReq->taskId, numOfTasks);
} }
streamMetaWLock(pMeta);
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaCommit(pMeta); code = streamMetaCommit(pMeta);
} }
@ -675,8 +673,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
streamMetaWUnLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now // drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
@ -685,9 +681,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
(int32_t)pReq->hTaskId, numOfTasks); (int32_t)pReq->hTaskId, numOfTasks);
} }
streamMetaWLock(pMeta);
code = streamMetaCommit(pMeta); code = streamMetaCommit(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -501,8 +501,6 @@ _err:
void streamMetaInitBackend(SStreamMeta* pMeta) { void streamMetaInitBackend(SStreamMeta* pMeta) {
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
if (pMeta->streamBackend == NULL) { if (pMeta->streamBackend == NULL) {
streamMetaWUnLock(pMeta);
while (1) { while (1) {
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId);
@ -908,8 +906,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
int32_t code = 0; int32_t code = 0;
STaskId id = {.streamId = streamId, .taskId = taskId}; STaskId id = {.streamId = streamId, .taskId = taskId};
streamMetaWLock(pMeta);
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) { if (code == 0) {
// desc the paused task counter // desc the paused task counter
@ -958,10 +954,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
} }
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
streamMetaWUnLock(pMeta);
} else { } else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
} }
return 0; return 0;