From 0d38f389abe7c229cb2ffb7d8c25582874051e6a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 13 Aug 2023 16:25:35 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/common/tmsgdef.h | 1 - include/libs/stream/tstream.h | 4 +- source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 - source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 51 ++++------------ source/dnode/vnode/src/tq/tqRestore.c | 17 ++++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 2 - source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/stream.c | 20 ++++++- source/libs/stream/src/streamDispatch.c | 41 +++++++++---- source/libs/stream/src/streamExec.c | 62 ++++++++----------- source/libs/stream/src/streamRecover.c | 66 +-------------------- 12 files changed, 100 insertions(+), 167 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 232551007d..60172bce3d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -254,7 +254,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 508392ff77..d3b670d0ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -274,7 +274,7 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool transferState; - bool appendTranstateBlock; // has append the transfer state data block already + bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; @@ -582,6 +582,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); @@ -629,7 +630,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); -int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); int32_t appendTranstateIntoInputQ(SStreamTask* pTask); // agg level diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bed9a67303..cf57deaa22 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cd7704940b..d41c58b501 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ad1af080fd..9dfde0fed7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -928,6 +928,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + streamTaskOpenAllUpstreamInput(pTask); + // backup the initial status, and set it to be TASK_STATUS__INIT pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; @@ -1272,7 +1274,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamTaskEndScanWAL(pTask); + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); + appendTranstateIntoInputQ(pTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 @@ -1337,44 +1340,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } -// notify the downstream tasks to transfer executor state after handle all history blocks. -int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - - SStreamTransferReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); - int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); - return -1; - } - - int32_t remain = streamAlignTransferState(pTask); - if (remain > 0) { - tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; - } - - // transfer the ownership of executor state - tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); - ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - - pTask->status.transferState = true; - - streamSchedExec(pTask); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; -} - int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1706,6 +1671,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { STQ* pTq = pVnode->pTq; + int32_t vgId = pVnode->config.vgId; + SMsgHead* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1722,7 +1689,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); + tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, false); @@ -1739,7 +1708,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { FAIL: if (pMsg->info.handle == NULL) { - tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId); + tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId); return -1; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index a217bc2966..3054179416 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -210,14 +210,21 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { } static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { + const char* id = pTask->id.idStr; + if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 - ", not scan wal anymore, set the transfer state flag", - pTask->id.idStr, ver, pTask->dataRange.range.maxVer); if (!pTask->status.appendTranstateBlock) { - pTask->status.appendTranstateBlock = true; + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 + ", not scan wal anymore, add transfer-state block into inputQ", + id, ver, pTask->dataRange.range.maxVer); + + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); appendTranstateIntoInputQ(pTask); /*int32_t code = */streamSchedExec(pTask); + } else { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", + id, ver, pTask->dataRange.range.maxVer); } } } @@ -264,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { + if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index bf1abf5795..70c3382dc5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -660,8 +660,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TRANSFER_STATE: - return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index b0fed5dde1..7a557a744a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b9c9e40562..e9b38dfff2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -213,6 +213,10 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); } else { + if (pBlock->type == STREAM_INPUT__TRANS_STATE) { + pTask->status.appendTranstateBlock = true; + } + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); // input queue is full, upstream is blocked now status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; @@ -379,6 +383,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + } else { + ASSERT(0); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { @@ -421,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t } return NULL; -} \ No newline at end of file +} + +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + if (num == 0) { + return; + } + + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + pInfo->dataAllowed = true; + } +} diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bcd45875fb..d479dd44df 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -38,6 +38,7 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; @@ -91,6 +92,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; @@ -115,8 +117,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, - int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { +int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, + int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; pReq->srcVgId = vgId; pReq->upstreamTaskId = pTask->id.taskId; @@ -456,8 +458,8 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); @@ -720,14 +722,16 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + const char* id = pTask->id.idStr; + if (code != TSDB_CODE_SUCCESS) { // dispatch message failed: network error, or node not available. // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure // happened too fast. // todo handle the shuffle dispatch failure - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, + tstrerror(code), ++pTask->msgInfo.retryCount); int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (ret != TSDB_CODE_SUCCESS) { } @@ -735,22 +739,35 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return TSDB_CODE_SUCCESS; } - qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, - code); + qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId, + pRsp->inputStatus, code); // there are other dispatch message not response yet if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); + qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp); if (leftRsp > 0) { return 0; } } + // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state + SStreamDataBlock* p = pTask->msgInfo.pData; + if (p->type == STREAM_INPUT__TRANS_STATE) { + qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); + + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + return code; + } + } + pTask->msgInfo.retryCount = 0; ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); + qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status); // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp @@ -758,7 +775,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", - pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); } else { // pipeline send data in output queue // this message has been sent successfully, let's try next one. @@ -767,8 +784,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pTask->msgInfo.blockingTs != 0) { int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", - pTask->id.idStr, pRsp->downstreamTaskId, el); + qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id, + pRsp->downstreamTaskId, el); pTask->msgInfo.blockingTs = 0; // put data into inputQ of current task is also allowed diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index bbfaa8cb9d..269334f54d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -383,11 +383,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; - if (!pTask->status.transferState) { - return code; - } + ASSERT(pTask->status.appendTranstateBlock == 1); int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { @@ -513,14 +511,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock // transfer the ownership of executor state if (level == TASK_LEVEL__SOURCE) { - qDebug("s-task:%s open transfer state flag for source task", id); + qDebug("s-task:%s add transfer-state block into outputQ", id); } else { - qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id); + qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id); ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); } - pTask->status.transferState = true; - // dispatch the tran-state block to downstream task immediately int32_t type = pTask->outputInfo.type; if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) && @@ -639,16 +635,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); // 1. notify all downstream tasks to transfer executor state after handle all history blocks. -// pTask->status.transferState = true; appendTranstateIntoInputQ(pTask); - - // 2. do transfer stream task operator states. - // todo remove this -// int32_t code = streamDoTransferStateToStreamTask(pTask); -// if (code != TSDB_CODE_SUCCESS) { // todo handle error -// return code; -// } - return TSDB_CODE_SUCCESS; } @@ -667,35 +654,36 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here - if (taosQueueEmpty(pTask->inputQueue->queue)) { +// if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->status.transferState) { - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - return code; - } +// if (pTask->status.transferState) { +// code = streamTransferStateToStreamTask(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); +// return code; +// } // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by // call this function (streamExecForAll) directly. -// code = streamExecForAll(pTask); -// if (code < 0) { - // do nothing -// } - } + // code = streamExecForAll(pTask); + // if (code < 0) { + // do nothing + // } +// } - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - } else { +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); +// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, +// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); +// } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } - } +// } } else { qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 708524bf10..72dae735e1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -372,49 +372,6 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { return 0; } -static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; - } - - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; - } - - tEncoderClear(&encoder); - - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_TRANSFER_STATE; - msg.info.noResp = 1; - - tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId); - - return 0; -} - int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) { @@ -442,6 +399,8 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_OUT_OF_MEMORY; } + pTask->status.appendTranstateBlock = true; + qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; streamSchedExec(pTask); @@ -449,27 +408,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { - SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; - - // serialize - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; - doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamTaskId = pVgInfo->taskId; - doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } - - return 0; -} - // agg int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) { pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);