From bbb8aaa0dcea4d13da419524b48e02e2cebc0c05 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Apr 2023 02:37:10 +0800 Subject: [PATCH] refactor: add some logs. --- source/dnode/vnode/src/tq/tq.c | 6 ++++++ source/libs/stream/src/stream.c | 1 + source/libs/stream/src/streamDispatch.c | 2 ++ source/libs/stream/src/streamRecover.c | 4 ++-- 4 files changed, 11 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aba72835cf..a42a2119d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -880,6 +880,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t } // do recovery step 2 + int64_t st = taosGetTimestampMs(); + tqDebug("s-task:%s start step2 recover, ts:%"PRId64, pTask->id.idStr, st); + code = streamSourceRecoverScanStep2(pTask, sversion); if (code < 0) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); @@ -905,6 +908,9 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t return -1; } + double el = (taosGetTimestampMs() - st)/ 1000.0; + tqDebug("s-task:%s step2 recover finished, el:%.2f s", pTask->id.idStr, el); + // dispatch recover finish req to all related downstream task code = streamDispatchRecoverFinishReq(pTask); if (code < 0) { diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 120fe68b5d..a0bc95ac01 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -256,6 +256,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i ASSERT(0); return 0; } + // continue dispatch streamDispatch(pTask); return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 95e864dc99..ce556aaa8d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -261,6 +261,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -512,6 +513,7 @@ int32_t streamDispatch(SStreamTask* pTask) { int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { + qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); return 0; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 40c33577fb..471b363cbd 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -224,8 +224,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr, - pTask->fixedEpDispatcher.taskId); + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d, status:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId, pTask->status.taskStatus); req.taskId = pTask->fixedEpDispatcher.taskId; streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);