diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index b53d6e9d1c..0578f77b8a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -143,7 +143,26 @@ int32_t streamSchedExec(SStreamTask* pTask) { return 0; } -static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { +static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) { + *pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + if (*pBuf == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + ((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId); + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead)); + + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + return TSDB_CODE_SUCCESS; +} + +static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { int8_t status = 0; SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId); @@ -158,23 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED; } - // rsp by input status - void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); - - pDispatchRsp->inputStatus = status; - pDispatchRsp->streamId = htobe64(pReq->streamId); - pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); - pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); - pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId); - pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); - - pRsp->pCont = buf; - pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); - tmsgSendRsp(pRsp); - - return (status == TASK_INPUT_STATUS__NORMAL) ? 0 : -1; + return status; } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { @@ -239,22 +242,36 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); - // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked - if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { - streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); - } + int32_t status = 0; - streamTaskAppendInputBlocks(pTask, pReq, pRsp); - tDeleteStreamDispatchReq(pReq); - - if (exec) { - if (streamTryExec(pTask) < 0) { - return -1; - } + SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + if (!pInfo->dataAllowed) { + qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId); + status = TASK_INPUT_STATUS__BLOCKED; } else { - streamSchedExec(pTask); + // Current task has received the checkpoint req from the upstream task, from which the message should all be blocked + if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); + qDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId); + } + + status = streamTaskAppendInputBlocks(pTask, pReq); } + { + // do send response with the input status + int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont); + if (code != TSDB_CODE_SUCCESS) { + // todo handle failure + return code; + } + + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); + tmsgSendRsp(pRsp); + } + + tDeleteStreamDispatchReq(pReq); + streamSchedExec(pTask); return 0; } @@ -262,10 +279,6 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { if (streamTryExec(pTask) < 0) { return -1; } - - /*if (pTask->dispatchType == TASK_OUTPUT__FIXED_DISPATCH || pTask->dispatchType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatchStreamBlock(pTask);*/ - /*}*/ return 0; } @@ -380,7 +393,7 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { } } -SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { +SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); for(int32_t i = 0; i < num; ++i) { SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 772500bc80..4ae7ea02b3 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -136,15 +136,31 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = 1; - // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. - // 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already. + // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into + // inputQ, to make sure all blocks with less version have been handled by this task already. return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); } +static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { + pBlock->srcTaskId = pTask->id.taskId; + pBlock->srcVgId = pTask->pMeta->vgId; + + int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + if (code == 0) { + streamDispatchStreamBlock(pTask); + } + + streamFreeQitem((SStreamQueueItem*)pBlock); + return code; +} + int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0); int64_t checkpointId = pDataBlock->info.version; + const char* id = pTask->id.idStr; + int32_t code = TSDB_CODE_SUCCESS; + // set the task status pTask->checkpointingId = checkpointId; pTask->status.taskStatus = TASK_STATUS__CK; @@ -153,20 +169,9 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - pBlock->srcTaskId = pTask->id.taskId; - pBlock->srcVgId = pTask->pMeta->vgId; - - qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr, - pTask->info.selfChildId); - - int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { // todo failed to add it into the output queue, free it. - return code; - } - - streamFreeQitem((SStreamQueueItem*)pBlock); - streamDispatchStreamBlock(pTask); - } else { // only one task exists + qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId); + continueDispatchCheckpointBlock(pBlock, pTask); + } else { // only one task exists, no need to dispatch downstream info streamProcessCheckpointReadyMsg(pTask); } } else if (taskLevel == TASK_LEVEL__SINK) { @@ -176,7 +181,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // update the child Id for downstream tasks streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); - qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr); + qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); } else { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); @@ -188,14 +193,13 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); if (notReady > 0) { qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", - pTask->id.idStr, pTask->info.selfChildId, notReady, num); - return 0; + id, pTask->info.selfChildId, notReady, num); + return code; } qDebug( - "s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to " - "downstream", - pTask->id.idStr, num); + "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream", + id, num); // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task // can start local checkpoint procedure @@ -204,23 +208,10 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // already. And then, dispatch check point msg to all downstream tasks - - { - pBlock->srcTaskId = pTask->id.taskId; - pBlock->srcVgId = pTask->pMeta->vgId; - - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { // todo failed to add it into the output queue, free it. - return code; - } - - streamFreeQitem((SStreamQueueItem*)pBlock); - streamDispatchStreamBlock(pTask); - } + code = continueDispatchCheckpointBlock(pBlock, pTask); } - return 0; + return code; } /** @@ -259,10 +250,12 @@ int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId) { int8_t prev = p->status.taskStatus; p->status.taskStatus = TASK_STATUS__NORMAL; + // save the task streamMetaSaveTask(pMeta, p); - qDebug("vgId:%d s-task:%s commit task status after checkpoint completed, checkpointId:%" PRId64 ", ver:%" PRId64 - " currentVer:%" PRId64 ", status to be normal, prev:%s", - pMeta->vgId, p->id.idStr, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, + streamTaskOpenAllUpstreamInput(p); // open inputQ for all upstream tasks + qDebug("vgId:%d s-task:%s level:%d commit task status after checkpoint completed, checkpointId:%" PRId64 + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status to be normal, prev:%s", + pMeta->vgId, p->id.idStr, p->info.taskLevel, checkpointId, p->chkInfo.checkpointVer, p->chkInfo.currentVer, streamGetTaskStatusStr(prev)); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 08dc59dfd2..a07b775cb8 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -386,7 +386,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, id, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - ASSERT((*pVer) < pSubmit->submit.ver); + ASSERT((*pVer) <= pSubmit->submit.ver); (*pVer) = pSubmit->submit.ver; } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { @@ -405,7 +405,7 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_ qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d, ver:%" PRId64, id, pTask, numOfBlocks, pMerged->ver); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - ASSERT((*pVer) < pMerged->ver); + ASSERT((*pVer) <= pMerged->ver); (*pVer) = pMerged->ver; } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { @@ -485,7 +485,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.currentVer && ver <= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - qDebug("s-task:%s update checkpoint ver from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, + qDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64, pTask->id.idStr, ver, pTask->chkInfo.checkpointVer); }