diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 392f64f524..60019977cc 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -202,6 +202,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock SStreamUpstreamEpInfo* pInfo = NULL; streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo); if (pInfo == NULL) { + streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } @@ -437,7 +438,7 @@ int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstream stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr, numOfConfirmed, checkpointId); - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return TSDB_CODE_SUCCESS; } @@ -530,13 +531,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pTask->status.taskStatus = TASK_STATUS__READY; code = streamMetaSaveTask(pMeta, pTask); + streamMutexUnlock(&pTask->lock); + if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id, vgId, pReq->checkpointId, terrstr()); return code; } - streamMutexUnlock(&pTask->lock); streamMetaWUnLock(pMeta); // drop task should not in the meta-lock, and drop the related fill-history task now @@ -981,7 +983,7 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) { } } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); } int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { @@ -995,7 +997,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { num++; } } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); return num; } @@ -1017,7 +1019,7 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) { } } - streamMutexUnlock(&pTask->lock); + streamMutexUnlock(&pInfo->lock); int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask); diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 4a6c9b5c2f..931de397cc 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -41,11 +41,11 @@ tmr_h streamTimerGetInstance() { void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId, const char* pMsg) { - while (1) { +// while (1) { bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId); if (ret) { - break; +// break; } - stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg); - } +// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg); +// } }