From bb9edde79ed8b7ab5768a73677aab1f2625fcac0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 8 Jan 2024 11:33:54 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 15 +++++++++++---- source/libs/stream/src/streamQueue.c | 1 + 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 6b1c7ccedb..656ee86831 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -109,6 +109,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + int64_t st = taosGetTimestampMs(); SStreamTaskNodeUpdateMsg req = {0}; @@ -194,7 +195,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s vgId:%d not save since restore not finish", pTask->id.idStr, vgId); } - tqDebug("s-task:%s start to stop task after save task", pTask->id.idStr); + tqDebug("s-task:%s vgId:%d start to stop task after save task", pTask->id.idStr, vgId); streamTaskStop(pTask); // keep the already handled info @@ -202,10 +203,16 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM if (ppHTask != NULL) { streamTaskStop(*ppHTask); - tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr); + + int64_t now = taosGetTimestampMs(); + tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask and related fill-history task closed, elapsed:%" PRId64 + " ms", + pTask->id.idStr, vgId, now-st); taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0); } else { - tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr); + int64_t now = taosGetTimestampMs(); + tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", pTask->id.idStr, + vgId, now - st); } rsp.code = 0; @@ -226,7 +233,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM pMeta->startInfo.tasksWillRestart = 0; streamMetaWUnLock(pMeta); } else { - tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks); + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 // for test purpose, to trigger the leader election taosMSleep(5000); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 479612d9d1..5f55285ab1 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -175,6 +175,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (qItem == NULL) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(WAIT_FOR_DURATION); + // todo remove it continue; }