fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2023-12-22 14:51:04 +08:00
parent 90898a16e0
commit e91afa7df2
3 changed files with 17 additions and 12 deletions

View File

@ -845,6 +845,7 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pKey);
int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded);
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta);

View File

@ -123,6 +123,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return 0;
}
// todo: createMsg to invoke this function in stream threads, to avoid blocking the syn thread
int32_t tqStopStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
@ -142,7 +143,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
SStreamTask* pTask = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) {
continue;
}

View File

@ -607,22 +607,23 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
return (int32_t)size;
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
streamMetaRLock(pMeta);
SStreamTask* streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
if (!streamTaskShouldStop(*ppTask)) {
if (ppTask == NULL || streamTaskShouldStop(*ppTask)) {
return NULL;
}
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
streamMetaRUnLock(pMeta);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
return *ppTask;
}
}
SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
streamMetaRLock(pMeta);
SStreamTask* p = streamMetaAcquireTaskNoLock(pMeta, streamId, taskId);
streamMetaRUnLock(pMeta);
return NULL;
return p;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
@ -1293,6 +1294,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
void streamMetaRLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-rlock", pMeta->vgId);
taosRLockLatch(&pMeta->lock);
ASSERT(pMeta->lock != 0x40000001);
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
@ -1303,6 +1305,7 @@ void streamMetaRUnLock(SStreamMeta* pMeta) {
void streamMetaWLock(SStreamMeta* pMeta) {
stTrace("vgId:%d meta-wlock", pMeta->vgId);
taosWLockLatch(&pMeta->lock);
ASSERT(pMeta->lock != 0x40000001);
stTrace("vgId:%d meta-wlock completed", pMeta->vgId);
}