diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d8fe5fe6b8..db07e214ad 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -794,6 +794,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; + int32_t vgId = pMeta->vgId; // pre-delete operation streamMetaWLock(pMeta); @@ -806,19 +807,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t // desc the paused task counter if (streamTaskShouldPause(pTask)) { int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1); - stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num); + stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num); } // handle the dropping event (void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL); } else { - stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId); + stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } streamMetaWUnLock(pMeta); - stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId); + stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId); while (1) { int32_t timerActive = 0; @@ -859,13 +860,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap); int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList); if (sizeInList != size) { - stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size); + stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size); } streamMetaWUnLock(pMeta); int32_t numOfTmr = pTask->status.timerActive; if (numOfTmr != 0) { - stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr); + stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr); } if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { @@ -877,7 +878,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t streamMetaReleaseTask(pMeta, pTask); } else { - stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId); + stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); streamMetaWUnLock(pMeta); }