From 0a672f1b9628a5d24e9f16bb5c1a3b11db6f4767 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 20 Feb 2025 16:55:10 +0800 Subject: [PATCH] refactor(stream): update logs. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 7 ++++--- source/libs/stream/src/streamExec.c | 9 ++++++--- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 4c3ba0c077..9c11d99c35 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -388,7 +388,7 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { } double el = (now - pTrans->startTime) / 1000.0; - mInfo("stream:%s id:%" PRIx64 " ongoing checkpoint trans, id:%d, elapsed time:%.2fs killed", pTrans->name, + mInfo("stream:0x%" PRIx64 " start to kill ongoing long checkpoint transId:%d, elapsed time:%.2fs. killed", pTrans->streamId, pTrans->transId, el); SStreamObj *p = NULL; @@ -396,9 +396,10 @@ void killChkptAndResetStreamTask(SMnode *pMnode, SArray* pLongChkpts) { if (code == 0 && p != NULL) { mndKillTransImpl(pMnode, pTrans->transId, p->sourceDb); - mDebug("create reset task trans for stream:%s 0x%" PRIx64, pTrans->name, pTrans->streamId); - mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); + mDebug("stream:%s 0x%" PRIx64 " transId:%d checkpointId:%" PRId64 " create reset task trans", p->name, + pTrans->streamId, pTrans->transId, p->checkpointId); + mndCreateStreamResetStatusTrans(pMnode, p, p->checkpointId); sdbRelease(pMnode->pSdb, p); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ee34648a47..8ee06a82db 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -777,7 +777,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { int32_t code = 0; // merge multiple input data if possible in the input queue. - stDebug("s-task:%s start to extract data block from inputQ", id); + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to extract data block from inputQ, ts:%" PRId64, id, st); while (1) { int32_t blockSize = 0; @@ -807,8 +808,6 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - int64_t st = taosGetTimestampMs(); - EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); if (ret == EXEC_AFTER_IDLE) { streamTaskSetIdleInfo(pTask, MIN_INVOKE_INTERVAL); @@ -825,6 +824,10 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { // dispatch checkpoint msg to all downstream tasks int32_t type = pInput->type; if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + + // Injection error: for automatic kill long trans test + taosMsleep(50*1000); + code = streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput); if (code != 0) { stError("s-task:%s failed to process checkpoint-trigger block, code:%s", pTask->id.idStr, tstrerror(code));