From 8c791578225a4d64b918d32b92a4f1e18514a8e2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 14 Jul 2023 18:35:41 +0800 Subject: [PATCH] fix(stream): close the inputQ if checkpoint msg received. --- source/libs/stream/src/stream.c | 83 +++++++++++++---------- source/libs/stream/src/streamCheckpoint.c | 2 + 2 files changed, 50 insertions(+), 35 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b53d6e9d1c..0578f77b8a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -143,7 +143,26 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (*pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + return TSDB_CODE_SUCCESS; +} + +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); @@ -158,23 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; } - // rsp by input status - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - pDispatchRsp->inputStatus = status; - pDispatchRsp->streamId = htobe64(pReq->streamId); - pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); - pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); - pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - - return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1; + return status; } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { @@ -239,22 +242,36 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked - if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - } + int32_t status = 0; - streamTaskAppendInputBlocks(pTask, pReq, pRsp); - tDeleteStreamDispatchReq(pReq); - - if (exec) { - if (streamTryExec(pTask) < 0) { - return -1; - } + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + if (!pInfo->dataAllowed) { + qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; } else { - streamSchedExec(pTask); + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); + } + + status = streamTaskAppendInputBlocks(pTask, pReq); } + { + // do send response with the input status + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + if (code != TSDB_CODE_SUCCESS) { + // todo handle failure + return code; + } + + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + } + + tDeleteStreamDispatchReq(pReq); + streamSchedExec(pTask); return 0; } @@ -262,10 +279,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatchStreamBlock(pTask);*/ - /*}*/ return 0; } @@ -380,7 +393,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { } } -SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { +SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); for(int32_t i = 0; i < num; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 772500bc80..0b39e6f834 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -259,7 +259,9 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { int8_t prev = p->status.taskStatus; p->status.taskStatus = TASK_STATUS__NORMAL; + // save the task streamMetaSaveTask(pMeta, p); + streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer,