diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 82e57daf56..c690610341 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -826,6 +826,9 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { streamSourceRecoverScanStep1(pTask); if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { + double el = (taosGetTimestampMs() - st) / 1000.0; + tqDebug("s-task:%s is dropped, abort recover in step1", pTask->id.idStr); + streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } @@ -842,7 +845,6 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - if (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__DROPPING) { return 0; } @@ -852,13 +854,14 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg) { void* serializedReq = rpcMallocCont(len); if (serializedReq == NULL) { + tqError("s-task:%s failed to prepare the step2 stage, out of memory", pTask->id.idStr); return -1; } memcpy(serializedReq, &req, len); // dispatch msg - tqDebug("s-task:%s start recover block stage", pTask->id.idStr); + tqDebug("s-task:%s start recover block stage(step 2)", pTask->id.idStr); SRpcMsg rpcMsg = { .code = 0, .contLen = len, .msgType = TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE, .pCont = serializedReq}; @@ -870,7 +873,8 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t int32_t code = 0; SStreamRecoverStep2Req* pReq = (SStreamRecoverStep2Req*)msg; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pReq->taskId); if (pTask == NULL) { return -1; } @@ -912,7 +916,6 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t streamMetaSaveTask(pTq->pStreamMeta, pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 549374ed94..95e864dc99 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -270,8 +270,12 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov SEncoder encoder; tEncoderInit(&encoder, abuf, tlen); if ((code = tEncodeSStreamRecoverFinishReq(&encoder, pReq)) < 0) { - goto FAIL; + if (buf) { + rpcFreeCont(buf); + } + return code; } + tEncoderClear(&encoder); msg.contLen = tlen + sizeof(SMsgHead); @@ -280,13 +284,10 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov msg.info.noResp = 1; tmsgSendReq(pEpSet, &msg); - - qDebug("dispatch from task %d to task %d node %d: recover finish msg", pTask->id.taskId, pReq->taskId, vgId); + qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr, + pReq->taskId, vgId); return 0; -FAIL: - if (buf) rpcFreeCont(buf); - return code; } int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { @@ -407,13 +408,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; - qDebug("s-task:%s (child taskId:%d) dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, + qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, pTask->selfChildId, blockNum, downstreamTaskId, vgId); if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { goto FAIL_FIXED_DISPATCH; } + code = 0; + FAIL_FIXED_DISPATCH: taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); @@ -427,6 +430,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t vgSz = taosArrayGetSize(vgInfo); SStreamDispatchReq* pReqs = taosMemoryCalloc(vgSz, sizeof(SStreamDispatchReq)); if (pReqs == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -442,6 +446,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (pReqs[i].data == NULL || pReqs[i].dataLen == NULL) { goto FAIL_SHUFFLE_DISPATCH; } + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); pReqs[i].taskId = pVgInfo->taskId; } @@ -468,33 +473,41 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, + blockNum, vgSz); + for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { - // send SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, + pReqs[i].blockNum, pVgInfo->vgId); + if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } } + code = 0; + FAIL_SHUFFLE_DISPATCH: - if (pReqs) { - for (int32_t i = 0; i < vgSz; i++) { - taosArrayDestroyP(pReqs[i].data, taosMemoryFree); - taosArrayDestroy(pReqs[i].dataLen); - } - taosMemoryFree(pReqs); + for (int32_t i = 0; i < vgSz; i++) { + taosArrayDestroyP(pReqs[i].data, taosMemoryFree); + taosArrayDestroy(pReqs[i].dataLen); } - return code; + taosMemoryFree(pReqs); } - return 0; + return code; } int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - qDebug("s-task:%s try to dispatch intermediate result block to downstream, numofBlocks in outputQ:%d", pTask->id.idStr, - taosQueueItemSize(pTask->outputQueue->queue)); + + int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); + if (numOfElems > 0) { + qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, + numOfElems); + } int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); @@ -504,7 +517,7 @@ int32_t streamDispatch(SStreamTask* pTask) { SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); if (pBlock == NULL) { - qDebug("s-task:%s stream stop dispatching since no output in output queue", pTask->id.idStr); + qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } @@ -516,10 +529,8 @@ int32_t streamDispatch(SStreamTask* pTask) { code = -1; streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - goto FREE; } -FREE: taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); return code; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0d1440fbde..40c33577fb 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -212,7 +212,7 @@ int32_t streamBuildSourceRecover2Req(SStreamTask* pTask, SStreamRecoverStep2Req* int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { void* exec = pTask->exec.pExecutor; - qDebug("s-task:%s recover step2(blocking stage) started", pTask->id.idStr); + qDebug("s-task:%s recover step2 (blocking stage) started", pTask->id.idStr); if (qStreamSourceRecoverStep2(exec, ver) < 0) { } @@ -220,12 +220,13 @@ int32_t streamSourceRecoverScanStep2(SStreamTask* pTask, int64_t ver) { } int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { - SStreamRecoverFinishReq req = { - .streamId = pTask->id.streamId, - .childId = pTask->selfChildId, - }; + SStreamRecoverFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->selfChildId }; + // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + qDebug("s-task:%s send recover finish msg to downstream (fix-dispatch) to taskId:%d", pTask->id.idStr, + pTask->fixedEpDispatcher.taskId); + req.taskId = pTask->fixedEpDispatcher.taskId; streamDispatchOneRecoverFinishReq(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {