diff --git a/include/common/tcommon.h b/include/common/tcommon.h index bdfb1d32b4..8482ba8a78 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -152,6 +152,8 @@ enum { STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, + STREAM_INPUT__CHECKPOINT_TRIGGER, + STREAM_INPUT__TRANS_STATE, STREAM_INPUT__REF_DATA_BLOCK, STREAM_INPUT__DESTROY, }; @@ -168,7 +170,9 @@ typedef enum EStreamType { STREAM_PULL_DATA, STREAM_PULL_OVER, STREAM_FILL_OVER, + STREAM_CHECKPOINT, STREAM_CREATE_CHILD_TABLE, + STREAM_TRANS_STATE, } EStreamType; #pragma pack(push, 1) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 232551007d..60172bce3d 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -254,7 +254,6 @@ enum { TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b9b24917f3..d3b670d0ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -122,6 +122,7 @@ typedef struct { int8_t type; int32_t srcVgId; + int32_t srcTaskId; int32_t childId; int64_t sourceVer; int64_t reqId; @@ -251,6 +252,7 @@ typedef struct SStreamChildEpInfo { int32_t nodeId; int32_t childId; int32_t taskId; + int8_t dataAllowed; SEpSet epSet; } SStreamChildEpInfo; @@ -272,6 +274,7 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t keepTaskStatus; bool transferState; + bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it int8_t timerActive; // timer is active int8_t pauseAllowed; // allowed task status to be set to be paused } SStreamStatus; @@ -399,8 +402,9 @@ typedef struct { typedef struct { int64_t streamId; + int32_t type; int32_t taskId; - int32_t dataSrcVgId; + int32_t srcVgId; int32_t upstreamTaskId; int32_t upstreamChildId; int32_t upstreamNodeId; @@ -570,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq); -int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId); void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); @@ -579,6 +581,8 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId); +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); @@ -626,7 +630,7 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p int32_t streamSourceScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); -int32_t streamDispatchTransferStateMsg(SStreamTask* pTask); +int32_t appendTranstateIntoInputQ(SStreamTask* pTask); // agg level int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index bed9a67303..cf57deaa22 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 4000e72835..635fdcf459 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->chkInfo.version = ver; pTask->pMeta = pSnode->pMeta; + streamTaskOpenAllUpstreamInput(pTask); + pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { return -1; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index cd7704940b..d41c58b501 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); -int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqCheckLogInWal(STQ* pTq, int64_t version); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 59683dbefe..70534fa975 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -926,6 +926,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + streamTaskOpenAllUpstreamInput(pTask); + // backup the initial status, and set it to be TASK_STATUS__INIT pTask->chkInfo.version = ver; pTask->chkInfo.currentVer = ver; @@ -1270,7 +1272,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { if (done) { pTask->tsInfo.step2Start = taosGetTimestampMs(); - streamTaskEndScanWAL(pTask); + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0); + appendTranstateIntoInputQ(pTask); } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 @@ -1335,44 +1338,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { return 0; } -// notify the downstream tasks to transfer executor state after handle all history blocks. -int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) { - char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - - SStreamTransferReq req = {0}; - - SDecoder decoder; - tDecoderInit(&decoder, (uint8_t*)pReq, len); - int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req); - tDecoderClear(&decoder); - - tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId); - - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId); - if (pTask == NULL) { - tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId); - return -1; - } - - int32_t remain = streamAlignTransferState(pTask); - if (remain > 0) { - tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; - } - - // transfer the ownership of executor state - tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr); - ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); - - pTask->status.transferState = true; - - streamSchedExec(pTask); - streamMetaReleaseTask(pTq->pStreamMeta, pTask); - return 0; -} - int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1704,6 +1669,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { STQ* pTq = pVnode->pTq; + int32_t vgId = pVnode->config.vgId; + SMsgHead* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); @@ -1720,7 +1687,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { tDecoderClear(&decoder); int32_t taskId = req.taskId; - SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId); + tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId); + + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId); if (pTask != NULL) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; streamProcessDispatchMsg(pTask, &req, &rsp, false); @@ -1737,7 +1706,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { FAIL: if (pMsg->info.handle == NULL) { - tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId); + tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId); return -1; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 3d9a91899c..3054179416 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -210,13 +210,22 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) { } static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) { - if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { - qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 - ", not scan wal anymore, set the transfer state flag", - pTask->id.idStr, ver, pTask->dataRange.range.maxVer); - pTask->status.transferState = true; + const char* id = pTask->id.idStr; - /*int32_t code = */streamSchedExec(pTask); + if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) { + if (!pTask->status.appendTranstateBlock) { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 + ", not scan wal anymore, add transfer-state block into inputQ", + id, ver, pTask->dataRange.range.maxVer); + + double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0; + qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); + appendTranstateIntoInputQ(pTask); + /*int32_t code = */streamSchedExec(pTask); + } else { + qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal", + id, ver, pTask->dataRange.range.maxVer); + } } } @@ -262,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { + if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 743470aac8..f75c779f4b 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -661,8 +661,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg); case TDMT_VND_STREAM_SCAN_HISTORY: return tqProcessTaskScanHistory(pVnode->pTq, pMsg); - case TDMT_STREAM_TRANSFER_STATE: - return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH: return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg); case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP: diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 32d6dc65d9..7a557a744a 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -52,7 +52,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDoDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, @@ -63,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq); int32_t streamNotifyUpstreamContinue(SStreamTask* pTask); int32_t streamTaskFillHistoryFinished(SStreamTask* pTask); +int32_t streamTransferStateToStreamTask(SStreamTask* pTask); extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index f85ade591c..e9b38dfff2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - int8_t status = 0; - - SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); - if (pBlock == NULL) { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; - qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, - pTask->id.idStr); - } else { - int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); - // input queue is full, upstream is blocked now - status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED; - } - - // rsp by input status - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - pDispatchRsp->inputStatus = status; - pDispatchRsp->streamId = htobe64(pReq->streamId); - pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); - pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); - pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - - return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; -} - int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); int8_t status = TASK_INPUT_STATUS__NORMAL; @@ -235,90 +201,115 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return 0; } + + +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { + int8_t status = 0; + + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); + if (pBlock == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); + } else { + if (pBlock->type == STREAM_INPUT__TRANS_STATE) { + pTask->status.appendTranstateBlock = true; + } + + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock); + // input queue is full, upstream is blocked now + status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; + } + + return status; +} + +static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (*pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + return TSDB_CODE_SUCCESS; +} + +void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId); + if (pInfo != NULL) { + pInfo->dataAllowed = false; + } +} + int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // todo add the input queue buffer limitation - streamTaskEnqueueBlocks(pTask, pReq, pRsp); - tDeleteStreamDispatchReq(pReq); + int32_t status = 0; - if (exec) { - if (streamTryExec(pTask) < 0) { - return -1; - } + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + ASSERT(pInfo != NULL); + + if (!pInfo->dataAllowed) { + qWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; } else { - streamSchedExec(pTask); + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); + } + + status = streamTaskAppendInputBlocks(pTask, pReq); } - return 0; -} - -// todo record the idle time for dispatch data -int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { - if (code != TSDB_CODE_SUCCESS) { - // dispatch message failed: network error, or node not available. - // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set - // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure - // happened too fast. todo handle the shuffle dispatch failure - if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, no-retry", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code)); + { + // do send response with the input status + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + if (code != TSDB_CODE_SUCCESS) { + // todo handle failure return code; - } else { - qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr, - pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount); - return streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); - } - } - - qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); - - // there are other dispatch message not response yet - if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); - if (leftRsp > 0) { - return 0; - } - } - - pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); - - // the input queue of the (down stream) task that receive the output data is full, - // so the TASK_INPUT_STATUS_BLOCKED is rsp - // todo blocking the output status - if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { - pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time - - int32_t waitDuration = 300; // 300 ms - qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data", - pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, waitDuration); - streamRetryDispatchStreamBlock(pTask, waitDuration); - } else { // pipeline send data in output queue - // this message has been sent successfully, let's try next one. - destroyStreamDataBlock(pTask->msgInfo.pData); - pTask->msgInfo.pData = NULL; - - if (pTask->msgInfo.blockingTs != 0) { - int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; - qDebug("s-task:%s resume to normal from inputQ blocking, idle time:%"PRId64"ms", pTask->id.idStr, el); - pTask->msgInfo.blockingTs = 0; } - // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - - // otherwise, continue dispatch the first block to down stream task in pipeline - streamDispatchStreamBlock(pTask); + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); } + tDeleteStreamDispatchReq(pReq); + streamSchedExec(pTask); + return 0; } +//int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { +// qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, +// pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); +// +// // todo add the input queue buffer limitation +// streamTaskEnqueueBlocks(pTask, pReq, pRsp); +// tDeleteStreamDispatchReq(pReq); +// +// if (exec) { +// if (streamTryExec(pTask) < 0) { +// return -1; +// } +// } else { +// streamSchedExec(pTask); +// } +// +// return 0; +//} + int32_t streamProcessRunReq(SStreamTask* pTask) { if (streamTryExec(pTask) < 0) { return -1; @@ -385,12 +376,15 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { destroyStreamDataBlock((SStreamDataBlock*) pItem); return code; } - } else if (type == STREAM_INPUT__CHECKPOINT) { + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__TRANS_STATE) { taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s trans-state blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. taosWriteQitem(pTask->inputQueue->queue, pItem); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + } else { + ASSERT(0); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { @@ -433,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t } return NULL; -} \ No newline at end of file +} + +void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { + int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList); + if (num == 0) { + return; + } + + for(int32_t i = 0; i < num; ++i) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i); + pInfo->dataAllowed = true; + } +} diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index bb4b842787..fc1b788b77 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -204,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) { if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); - } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { + } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8334ea1c88..94e005b790 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -25,6 +25,9 @@ typedef struct SBlockName { char parTbName[TSDB_TABLE_NAME_LEN]; } SBlockName; +static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, + int32_t numOfBlocks, int64_t dstTaskId, int32_t type); + static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->msgType = msgType; pMsg->pCont = pCont; @@ -35,8 +38,9 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->type) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; - if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; @@ -88,8 +92,9 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; @@ -113,14 +118,15 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { } int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, - int64_t dstTaskId) { + int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; - pReq->dataSrcVgId = vgId; + pReq->srcVgId = vgId; pReq->upstreamTaskId = pTask->id.taskId; pReq->upstreamChildId = pTask->info.selfChildId; pReq->upstreamNodeId = pTask->info.nodeId; pReq->blockNum = numOfBlocks; pReq->taskId = dstTaskId; + pReq->type = type; pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES); pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); @@ -436,9 +442,8 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { +static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; - int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); @@ -446,15 +451,15 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; - code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId); + code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; } for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); @@ -487,7 +492,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat for (int32_t i = 0; i < vgSz; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId); + code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type); if (code != TSDB_CODE_SUCCESS) { goto FAIL_SHUFFLE_DISPATCH; } @@ -497,8 +502,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast - if (pDataBlock->info.type == STREAM_DELETE_RESULT) { - + if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT || pDataBlock->info.type == STREAM_TRANS_STATE) { for (int32_t j = 0; j < vgSz; j++) { if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; @@ -518,14 +522,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } } - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->info.selfChildId, - numOfBlocks, vgSz); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, + pTask->info.selfChildId, numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->info.selfChildId, - pReqs[i].blockNum, pVgInfo->vgId); + qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, + pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId); code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet); if (code < 0) { @@ -536,7 +540,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat code = 0; - FAIL_SHUFFLE_DISPATCH: + FAIL_SHUFFLE_DISPATCH: for (int32_t i = 0; i < vgSz; i++) { taosArrayDestroyP(pReqs[i].data, taosMemoryFree); taosArrayDestroy(pReqs[i].dataLen); @@ -552,7 +556,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); + int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); @@ -593,12 +597,13 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } pTask->msgInfo.pData = pBlock; - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER || + pBlock->type == STREAM_INPUT__TRANS_STATE); int32_t retryCount = 0; while (1) { - int32_t code = streamDispatchAllBlocks(pTask, pBlock); + int32_t code = doDispatchAllBlocks(pTask, pBlock); if (code == TSDB_CODE_SUCCESS) { break; } @@ -715,3 +720,84 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { num); return 0; } + +int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) { + const char* id = pTask->id.idStr; + + if (code != TSDB_CODE_SUCCESS) { + // dispatch message failed: network error, or node not available. + // in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set + // flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure + // happened too fast. + // todo handle the shuffle dispatch failure + qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId, + tstrerror(code), ++pTask->msgInfo.retryCount); + int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData); + if (ret != TSDB_CODE_SUCCESS) { + } + + return TSDB_CODE_SUCCESS; + } + + qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId, + pRsp->inputStatus, code); + + // there are other dispatch message not response yet + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp); + if (leftRsp > 0) { + return 0; + } + } + + // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state + SStreamDataBlock* p = pTask->msgInfo.pData; + if (p->type == STREAM_INPUT__TRANS_STATE) { + qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); + + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + return code; + } + } + + pTask->msgInfo.retryCount = 0; + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); + + qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status); + + // the input queue of the (down stream) task that receive the output data is full, + // so the TASK_INPUT_STATUS_BLOCKED is rsp + if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { + pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream + pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time + qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data", + id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS); + streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); + } else { // pipeline send data in output queue + // this message has been sent successfully, let's try next one. + destroyStreamDataBlock(pTask->msgInfo.pData); + pTask->msgInfo.pData = NULL; + + if (pTask->msgInfo.blockingTs != 0) { + int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs; + qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id, + pRsp->downstreamTaskId, el); + pTask->msgInfo.blockingTs = 0; + + // put data into inputQ of current task is also allowed + pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; + } + + // now ready for next data output + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + + // otherwise, continue dispatch the first block to down stream task in pipeline + streamDispatchStreamBlock(pTask); + } + + return 0; +} diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b479931cd2..3b954793de 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { pStreamTask->id.idStr); } - ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true); + ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true); STimeWindow* pTimeWindow = &pStreamTask->dataRange.window; @@ -334,6 +334,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { qDebug("s-task:%s no need to update time window for non-source task", pStreamTask->id.idStr); } + // todo check the output queue for fill-history task, and wait for it complete + + // 1. expand the query time window for stream task of WAL scanner pTimeWindow->skey = INT64_MIN; qStreamInfoResetTimewindowFilter(pStreamTask->exec.pExecutor); @@ -380,17 +383,18 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; - if (!pTask->status.transferState) { - return code; - } + ASSERT(pTask->status.appendTranstateBlock == 1); int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__SOURCE) { streamTaskFillHistoryFinished(pTask); - streamTaskEndScanWAL(pTask); - } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. + code = streamDoTransferStateToStreamTask(pTask); + if (code != TSDB_CODE_SUCCESS) { // todo handle this + return code; + } + } else if (level == TASK_LEVEL__AGG) { // do transfer task operator states. code = streamDoTransferStateToStreamTask(pTask); if (code != TSDB_CODE_SUCCESS) { // todo handle this return code; @@ -400,14 +404,41 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return code; } -static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, - const char* id) { - int32_t retryTimes = 0; - int32_t MAX_RETRY_TIMES = 5; +static int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + const char* id = pTask->id.idStr; + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one + while (1) { + if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { + qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); + continue; + } + + qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); + return TSDB_CODE_SUCCESS; + } + + qDebug("s-task:%s sink task handle result block one-by-one", id); + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + } + + // non sink task while (1) { - if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { + qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); return TSDB_CODE_SUCCESS; } @@ -415,51 +446,106 @@ static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (qItem == NULL) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(10); - qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); continue; } - qDebug("===stream===break batchSize:%d", *numOfBlocks); + qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; } - // do not merge blocks for sink node - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - *numOfBlocks = 1; - *pInput = qItem; - return TSDB_CODE_SUCCESS; - } - - if (*pInput == NULL) { - ASSERT((*numOfBlocks) == 0); - *pInput = qItem; - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = streamMergeQueueItem(*pInput, qItem); - if (newRet == NULL) { - if (terrno == 0) { - qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); - } else { - qDebug("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d, code:%s", id, *numOfBlocks, - tstrerror(terrno)); - } + // do not merge blocks for sink node and check point data block + if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || + qItem->type == STREAM_INPUT__TRANS_STATE) { + if (*pInput == NULL) { + qDebug("s-task:%s checkpoint/transtate msg extracted, start to process immediately", id); + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } else { + // previous existed blocks needs to be handle, before handle the checkpoint msg block + qDebug("s-task:%s checkpoint/transtate msg extracted, handle previous block first, numOfBlocks:%d", id, + *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } + } else { + if (*pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } - *pInput = newRet; - } + *pInput = newRet; + } - *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputQueue); + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); - if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); - return TSDB_CODE_SUCCESS; + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } } } } +int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) { + int32_t remain = streamAlignTransferState(pTask); + if (remain > 0) { + streamFreeQitem((SStreamQueueItem*)pBlock); + qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain); + return 0; + } + } + + // dispatch the tran-state block to downstream task immediately + int32_t type = pTask->outputInfo.type; + + // transfer the ownership of executor state + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (level == TASK_LEVEL__SOURCE) { + qDebug("s-task:%s add transfer-state block into outputQ", id); + } else { + qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id); + ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1); + } + + if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { + pBlock->srcVgId = pTask->pMeta->vgId; + code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); + if (code == 0) { + streamDispatchStreamBlock(pTask); + } else { + streamFreeQitem((SStreamQueueItem*)pBlock); + } + } + } else { // non-dispatch task, do task state transfer directly + qDebug("s-task:%s non-dispatch task, start to transfer state directly", id); + + streamFreeQitem((SStreamQueueItem*)pBlock); + ASSERT(pTask->info.fillHistory == 1); + code = streamTransferStateToStreamTask(pTask); + + if (code != TSDB_CODE_SUCCESS) { + atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); + } + } + + return code; +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -478,12 +564,17 @@ int32_t streamExecForAll(SStreamTask* pTask) { // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); + /*int32_t code = */extractBlocksFromInputQ(pTask, &pInput, &batchSize); if (pInput == NULL) { ASSERT(batchSize == 0); break; } + if (pInput->type == STREAM_INPUT__TRANS_STATE) { + streamProcessTranstateBlock(pTask, (SStreamDataBlock*)pInput); + return 0; + } + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize); @@ -557,18 +648,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) { qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el); // 1. notify all downstream tasks to transfer executor state after handle all history blocks. - int32_t code = streamDispatchTransferStateMsg(pTask); - if (code != TSDB_CODE_SUCCESS) { - // todo handle error - } - - // 2. do transfer stream task operator states. - pTask->status.transferState = true; - code = streamDoTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { // todo handle error - return code; - } - + appendTranstateIntoInputQ(pTask); return TSDB_CODE_SUCCESS; } @@ -587,35 +667,36 @@ int32_t streamTryExec(SStreamTask* pTask) { } // todo the task should be commit here - if (taosQueueEmpty(pTask->inputQueue->queue)) { +// if (taosQueueEmpty(pTask->inputQueue->queue)) { // fill-history WAL scan has completed - if (pTask->status.transferState) { - code = streamTransferStateToStreamTask(pTask); - if (code != TSDB_CODE_SUCCESS) { - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - return code; - } +// if (pTask->status.transferState) { +// code = streamTransferStateToStreamTask(pTask); +// if (code != TSDB_CODE_SUCCESS) { +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); +// return code; +// } // the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by // call this function (streamExecForAll) directly. - code = streamExecForAll(pTask); - if (code < 0) { - // do nothing - } - } + // code = streamExecForAll(pTask); + // if (code < 0) { + // do nothing + // } +// } - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); - qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, - streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - } else { +// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); +// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, +// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); +// } else { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); - if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) { + if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) || + streamTaskShouldPause(&pTask->status))) { streamSchedExec(pTask); } - } +// } } else { qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index e59b3f682d..72dae735e1 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -372,68 +372,40 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { return 0; } -static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) { - void* buf = NULL; - int32_t code = -1; - SRpcMsg msg = {0}; - - int32_t tlen; - tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code); - if (code < 0) { - return -1; +int32_t appendTranstateIntoInputQ(SStreamTask* pTask) { + SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); + if (pTranstate == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } - buf = rpcMallocCont(sizeof(SMsgHead) + tlen); - if (buf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (pBlock == NULL) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; } - ((SMsgHead*)buf)->vgId = htonl(vgId); - void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pTranstate->type = STREAM_INPUT__TRANS_STATE; - SEncoder encoder; - tEncoderInit(&encoder, abuf, tlen); - if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) { - if (buf) { - rpcFreeCont(buf); - } - return code; + pBlock->info.type = STREAM_TRANS_STATE; + pBlock->info.rows = 1; + pBlock->info.childId = pTask->info.selfChildId; + + pTranstate->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock; + taosArrayPush(pTranstate->blocks, pBlock); + + taosMemoryFree(pBlock); + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pTranstate) < 0) { + taosFreeQitem(pTranstate); + return TSDB_CODE_OUT_OF_MEMORY; } - tEncoderClear(&encoder); + pTask->status.appendTranstateBlock = true; - msg.contLen = tlen + sizeof(SMsgHead); - msg.pCont = buf; - msg.msgType = TDMT_STREAM_TRANSFER_STATE; - msg.info.noResp = 1; + qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus); + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; + streamSchedExec(pTask); - tmsgSendReq(pEpSet, &msg); - qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr, - pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId); - - return 0; -} - -int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { - SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; - - // serialize - if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; - doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - - int32_t numOfVgs = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < numOfVgs; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - req.downstreamTaskId = pVgInfo->taskId; - doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); - } - } - - return 0; + return TSDB_CODE_SUCCESS; } // agg