From 895a9a1f3d2c9823590c2b9b9710a3aa02085110 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Aug 2024 16:54:08 +0800 Subject: [PATCH 1/5] fix(stream): check status before start init, do some internal refactor. --- include/libs/stream/tstream.h | 13 +-- source/libs/stream/src/streamCheckpoint.c | 97 +++++++++++++---------- source/libs/stream/src/streamMeta.c | 29 +++++-- source/libs/stream/src/streamSched.c | 9 ++- source/libs/stream/src/streamTask.c | 21 +++++ source/libs/stream/src/streamTaskSm.c | 8 +- 6 files changed, 118 insertions(+), 59 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0d1e62cdbe..90cb06ff42 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -641,12 +641,13 @@ bool streamTaskShouldPause(const SStreamTask* pStatus); bool streamTaskIsIdle(const SStreamTask* pTask); bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus); -int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); -SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); -const char* streamTaskGetStatusStr(ETaskStatus status); -void streamTaskResetStatus(SStreamTask* pTask); -void streamTaskSetStatusReady(SStreamTask* pTask); -ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +int32_t createStreamTaskIdStr(int64_t streamId, int32_t taskId, const char** pId); +SStreamTaskState streamTaskGetStatus(const SStreamTask* pTask); +const char* streamTaskGetStatusStr(ETaskStatus status); +void streamTaskResetStatus(SStreamTask* pTask); +void streamTaskSetStatusReady(SStreamTask* pTask); +ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); +const char* streamTaskGetExecType(int32_t type); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index f817447099..f7c61b48e3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -355,43 +355,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock return code; } -/** - * All down stream tasks have successfully completed the check point task. - * Current stream task is allowed to start to do checkpoint things in ASYNC model. - */ -int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, - int32_t downstreamTaskId) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG); - SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; - - const char* id = pTask->id.idStr; - bool received = false; - int32_t total = streamTaskGetNumOfDownstream(pTask); - ASSERT(total > 0); - - // 1. not in checkpoint status now - SStreamTaskState pStat = streamTaskGetStatus(pTask); - if (pStat.state != TASK_STATUS__CK) { - stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); - return TSDB_CODE_STREAM_TASK_IVLD_STATUS; - } - - // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg - if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { - stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 - ") from task:0x%x, expired and discard ", - id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); - return -1; - } - - streamMutexLock(&pInfo->lock); - - // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task +static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, + int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, + const char* id, int32_t* pNotReady, int32_t* pTransId) { + bool received = false; int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); for (int32_t i = 0; i < size; ++i) { STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); return TSDB_CODE_INVALID_PARA; } @@ -403,27 +375,69 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId if (received) { stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, - downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total); + downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), + numOfDownstream); } else { STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(), .downstreamTaskId = downstreamTaskId, .checkpointId = pInfo->activeId, .transId = pInfo->transId, - .streamId = pTask->id.streamId, + .streamId = streamId, .downstreamNodeId = downstreamNodeId}; - (void)taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + void* p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info); + if (p == NULL) { + stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno)); + return terrno; + } } - int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); - int32_t transId = pInfo->transId; + *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList); + *pTransId = pInfo->transId; + return 0; +} + +/** + * All down stream tasks have successfully completed the check point task. + * Current stream task is allowed to start to do checkpoint things in ASYNC model. + */ +int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, + int32_t downstreamTaskId) { + SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo; + + const char* id = pTask->id.idStr; + int32_t total = streamTaskGetNumOfDownstream(pTask); + int32_t code = 0; + int32_t notReady = 0; + int32_t transId = 0; + + ASSERT(total > 0 && (pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG)); + + // 1. not in checkpoint status now + SStreamTaskState pStat = streamTaskGetStatus(pTask); + if (pStat.state != TASK_STATUS__CK) { + stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + + // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg + if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) { + stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64 + ") from task:0x%x, expired and discard", + id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId); + return TSDB_CODE_INVALID_MSG; + } + + streamMutexLock(&pInfo->lock); + code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, ¬Ready, + &transId); streamMutexUnlock(&pInfo->lock); - if (notReady == 0) { + if ((notReady == 0) && (code == 0)) { stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } - return 0; + return code; } int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) { @@ -1034,8 +1048,7 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); if (p == NULL) { - streamMutexUnlock(&pInfo->lock); - return num; + continue; } if (p->recved) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 08cf490b94..8ecd62d1eb 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1464,14 +1464,16 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) { } int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { - int32_t code = 0; - int32_t vgId = pMeta->vgId; + int32_t code = 0; + int32_t vgId = pMeta->vgId; + SStreamTask* pTask = NULL; + bool continueExec = true; + stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId); - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask); if (pTask == NULL) { - stError("vgId:%d failed to acquire task:0x%x when starting task", pMeta->vgId, taskId); + stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId); (void)streamMetaAddFailedTask(pMeta, streamId, taskId); return TSDB_CODE_STREAM_TASK_IVLD_STATUS; } @@ -1479,10 +1481,26 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas // fill-history task can only be launched by related stream tasks. STaskExecStatisInfo* pInfo = &pTask->execInfo; if (pTask->info.fillHistory == 1) { + stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId); streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } + streamMutexLock(&pTask->lock); + SStreamTaskState status = streamTaskGetStatus(pTask); + if (status.state != TASK_STATUS__UNINIT) { + stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name); + continueExec = false; + } else { + continueExec = true; + } + streamMutexUnlock(&pTask->lock); + + if (!continueExec) { + streamMetaReleaseTask(pMeta, pTask); + return TSDB_CODE_STREAM_TASK_IVLD_STATUS; + } + ASSERT(pTask->status.downstreamReady == 0); // avoid initialization and destroy running concurrently. @@ -1501,7 +1519,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { - stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT); + stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, + tstrerror(code)); streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); } } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index f9fcf36668..a83a0e4cc8 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -48,14 +48,15 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); if (pRunReq == NULL) { stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType, - terrstr()); - return TSDB_CODE_OUT_OF_MEMORY; + terrstr(terrno)); + return terrno; } if (streamId != 0) { - stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType); + stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType, + streamTaskGetExecType(execType)); } else { - stDebug("vgId:%d create msg to exec, type:%d", vgId, execType); + stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType)); } pRunReq->head.vgId = vgId; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a249bad724..f07fd81953 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1149,4 +1149,25 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); +} + +const char* streamTaskGetExecType(int32_t type) { + switch (type) { + case STREAM_EXEC_T_EXTRACT_WAL_DATA: + return "scan-wal-file"; + case STREAM_EXEC_T_START_ALL_TASKS: + return "start-all-tasks"; + case STREAM_EXEC_T_START_ONE_TASK: + return "start-one-task"; + case STREAM_EXEC_T_RESTART_ALL_TASKS: + return "restart-all-tasks"; + case STREAM_EXEC_T_STOP_ALL_TASKS: + return "stop-all-tasks"; + case STREAM_EXEC_T_RESUME_TASK: + return "resume-task-from-idle"; + case STREAM_EXEC_T_ADD_FAILED_TASK: + return "record-start-failed-task"; + default: + return "invalid-exec-type"; + } } \ No newline at end of file diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 275c9255d2..0779eede9f 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -322,12 +322,11 @@ static int32_t doHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event, STaskSt if (pTrans->attachEvent.event != 0) { code = attachWaitedEvent(pTask, &pTrans->attachEvent); + streamMutexUnlock(&pTask->lock); if (code) { return code; } - streamMutexUnlock(&pTask->lock); - while (1) { // wait for the task to be here streamMutexLock(&pTask->lock); @@ -557,6 +556,11 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) { } const char* streamTaskGetStatusStr(ETaskStatus status) { + int32_t index = status; + if (index < 0 || index > tListLen(StreamTaskStatusList)) { + return ""; + } + return StreamTaskStatusList[status].name; } From c1ca6ce46484420cc1de10104774143e80ceac0b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Aug 2024 17:24:29 +0800 Subject: [PATCH 2/5] fix(stream): set the concurrently handle init failed. --- source/libs/stream/src/streamMeta.c | 22 ++++++++++++++++++++-- source/libs/stream/src/streamTaskSm.c | 9 ++++++++- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8ecd62d1eb..670f692c59 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1379,7 +1379,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { if (ret != TSDB_CODE_SUCCESS) { stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT); code = ret; - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } streamMetaReleaseTask(pMeta, pTask); @@ -1486,6 +1489,8 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas return TSDB_CODE_SUCCESS; } + // the start all tasks procedure may happen to start the newly deployed stream task, and results in the + // concurrently start this task by two threads. streamMutexLock(&pTask->lock); SStreamTaskState status = streamTaskGetStatus(pTask); if (status.state != TASK_STATUS__UNINIT) { @@ -1516,12 +1521,17 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas streamMutexUnlock(&pTask->lock); } + // concurrently start task may cause the later started task be failed, and also failed to added into meta result. if (code == TSDB_CODE_SUCCESS) { code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT, tstrerror(code)); - streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + + // do no added into result hashmap if it is failed due to concurrently starting of this stream task. + if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) { + streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs); + } } } @@ -1584,6 +1594,14 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { + if (code == TSDB_CODE_DUP_KEY) { + stError("vgId:%d record start task result failed, s-task:0x%x already exist start results in meta dst hashmap", + pMeta->vgId, id.taskId); + } else { + stError("vgId:%d failed to record start task:0x%x results, start all tasks failed", pMeta->vgId, id.taskId); + } + streamMetaWUnLock(pMeta); + return code; } int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 0779eede9f..c63df059af 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -405,7 +405,7 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { EStreamTaskEvent evt = pSM->pActiveTrans->event; streamMutexUnlock(&pTask->lock); - stDebug("s-task:%s status:%s handling event:%s by some other thread, wait for 100ms and check if completed", + stDebug("s-task:%s status:%s handling event:%s by another thread, wait for 100ms and check if completed", pTask->id.idStr, pSM->current.name, GET_EVT_NAME(evt)); taosMsleep(100); } else { @@ -418,6 +418,13 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { } if (pSM->pActiveTrans != NULL) { + // not allowed concurrently initialization + if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { + stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); + code = TSDB_CODE_STREAM_INVALID_STATETRANS; + break; + } + // currently in some state transfer procedure, not auto invoke transfer, abort it stDebug("s-task:%s event:%s handle procedure quit, status %s -> %s failed, handle event %s now", pTask->id.idStr, GET_EVT_NAME(pSM->pActiveTrans->event), pSM->current.name, From e0a7e64a651e9fb3a4fc4fddced365c29a47bdec Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Aug 2024 17:28:05 +0800 Subject: [PATCH 3/5] fix(stream): fix deadlock. --- source/libs/stream/src/streamTaskSm.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c63df059af..d3c39da6bd 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -420,9 +420,9 @@ int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event) { if (pSM->pActiveTrans != NULL) { // not allowed concurrently initialization if (event == TASK_EVENT_INIT && pSM->pActiveTrans->event == TASK_EVENT_INIT) { + streamMutexUnlock(&pTask->lock); stError("s-task:%s already in handling init procedure, handle this init event failed", pTask->id.idStr); - code = TSDB_CODE_STREAM_INVALID_STATETRANS; - break; + return TSDB_CODE_STREAM_INVALID_STATETRANS; } // currently in some state transfer procedure, not auto invoke transfer, abort it From 9d219ad62a875c32be7e80dae8dea667369d13ba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Aug 2024 17:35:47 +0800 Subject: [PATCH 4/5] refactor: update log. --- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index c84e016459..3dc4beca57 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -82,7 +82,7 @@ static void doStartScanWal(void* param, void* tmrId) { taosMemoryFree(pParam); if (code) { - tqError("vgId:%d failed sched task to scan wal", vgId); + tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code)); } } From 281f636954787bc07febb8a927b8bf04e95a1ec4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 2 Aug 2024 18:21:46 +0800 Subject: [PATCH 5/5] fix(stream): fix syntax error. --- source/libs/stream/src/streamMeta.c | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 670f692c59..87293c59ec 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1565,11 +1565,12 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 int64_t endTs, bool ready) { STaskStartInfo* pStartInfo = &pMeta->startInfo; STaskId id = {.streamId = streamId, .taskId = taskId}; + int32_t vgId = pMeta->vgId; streamMetaWLock(pMeta); SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (p == NULL) { // task does not exists in current vnode, not record the complete info - stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId); streamMetaWUnLock(pMeta); return 0; } @@ -1584,7 +1585,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 stDebug( "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed " "time:%" PRId64 "ms", - pMeta->vgId, taskId, ready, el); + vgId, taskId, ready, el); streamMetaWUnLock(pMeta); return 0; } @@ -1595,10 +1596,11 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs)); if (code) { if (code == TSDB_CODE_DUP_KEY) { - stError("vgId:%d record start task result failed, s-task:0x%x already exist start results in meta dst hashmap", - pMeta->vgId, id.taskId); + stError("vgId:%d record start task result failed, s-task:0x%" PRIx64 + " already exist start results in meta start task result hashmap", + vgId, id.taskId); } else { - stError("vgId:%d failed to record start task:0x%x results, start all tasks failed", pMeta->vgId, id.taskId); + stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId); } streamMetaWUnLock(pMeta); return code; @@ -1613,20 +1615,20 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64 ", readyTs:%" PRId64 " total elapsed time:%.2fs", - pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, + vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs, pStartInfo->elapsedTime / 1000.0); // print the initialization elapsed time and info displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true); displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false); - streamMetaResetStartInfo(pStartInfo, pMeta->vgId); + streamMetaResetStartInfo(pStartInfo, vgId); streamMetaWUnLock(pMeta); code = pStartInfo->completeFn(pMeta); } else { streamMetaWUnLock(pMeta); - stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId, - ready, numOfRecv, numOfTotal); + stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready, + numOfRecv, numOfTotal); } return code;