diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7314e23145..36c35ab415 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -821,13 +821,18 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } // do recovery step 1 - streamSourceRecoverScanStep1(pTask); + tqDebug("s-task:%s start recover step 1 scan", pTask->id.idStr); + int64_t st = taosGetTimestampMs(); + streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } + double el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s recover step 1 ended, elapsed time:%.2fs", pTask->id.idStr, el); + // build msg to launch next step SStreamRecoverStep2Req req; code = streamBuildSourceRecover2Req(pTask, &req); @@ -853,20 +858,17 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { memcpy(serializedReq, &req, len); // dispatch msg + tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + SRpcMsg rpcMsg = { - .code = 0, - .contLen = len, - .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, - .pCont = serializedReq, - }; - + .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; tmsgPutToQueue(&pTq->pVnode->msgCb, WRITE_QUEUE, &rpcMsg); - return 0; } int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { - int32_t code; + int32_t code = 0; + SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 8cda5a21c9..22903b95d9 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -107,7 +107,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { if (streamTaskShouldStop(&pTask->status) || status == TASK_STATUS__RECOVER_PREPARE || status == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("s-task:%s skip push data, not ready for processing, status:%d", pTask->id.idStr, status); + tqDebug("s-task:%s not ready for new submit block from wal, status:%d", pTask->id.idStr, status); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 325d315262..e711700ef2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -165,20 +165,24 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { batchCnt++; - qDebug("task %d scan exec block num %d, block limit %d", pTask->id.taskId, batchCnt, batchSz); + qDebug("s-task:%s scan exec block num %d, block limit %d", pTask->id.idStr, batchCnt, batchSz); - if (batchCnt >= batchSz) break; + if (batchCnt >= batchSz) { + break; + } } + if (taosArrayGetSize(pRes) == 0) { if (finished) { taosArrayDestroy(pRes); - qDebug("task %d finish recover exec task ", pTask->id.taskId); + qDebug("s-task:%s finish recover exec task ", pTask->id.idStr); break; } else { - qDebug("task %d continue recover exec task ", pTask->id.taskId); + qDebug("s-task:%s continue recover exec task ", pTask->id.idStr); continue; } } + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 55c745e417..0d1440fbde 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -20,6 +20,8 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); + qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); + streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); @@ -197,7 +199,6 @@ int32_t streamBuildSourceRecover1Req(SStreamTask* pTask, SStreamRecoverStep1Req* } int32_t streamSourceRecoverScanStep1(SStreamTask* pTask) { - // return streamScanExec(pTask, 100); } @@ -210,8 +211,11 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; + + qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } + return streamScanExec(pTask, 100); }