fix(stream): fix deadlock

This commit is contained in:
Haojun Liao 2024-07-19 09:39:29 +08:00
parent 852dd833d0
commit 144855ef83
2 changed files with 11 additions and 9 deletions

View File

@ -202,6 +202,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
SStreamUpstreamEpInfo* pInfo = NULL; SStreamUpstreamEpInfo* pInfo = NULL;
streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo); streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
if (pInfo == NULL) { if (pInfo == NULL) {
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
@ -437,7 +438,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream
stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr, stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
numOfConfirmed, checkpointId); numOfConfirmed, checkpointId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -530,13 +531,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pTask->status.taskStatus = TASK_STATUS__READY; pTask->status.taskStatus = TASK_STATUS__READY;
code = streamMetaSaveTask(pMeta, pTask); code = streamMetaSaveTask(pMeta, pTask);
streamMutexUnlock(&pTask->lock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, pReq->checkpointId, terrstr()); vgId, pReq->checkpointId, terrstr());
return code; return code;
} }
streamMutexUnlock(&pTask->lock);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now // drop task should not in the meta-lock, and drop the related fill-history task now
@ -981,7 +983,7 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
} }
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
} }
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
@ -995,7 +997,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
num++; num++;
} }
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
return num; return num;
} }
@ -1017,7 +1019,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
} }
} }
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pInfo->lock);
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
int32_t total = streamTaskGetNumOfDownstream(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask);

View File

@ -41,11 +41,11 @@ tmr_h streamTimerGetInstance() {
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg) { const char* pMsg) {
while (1) { // while (1) {
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) { if (ret) {
break; // break;
}
stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg);
} }
// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg);
// }
} }