From 2f24f776fbf5e23a4ae7bcfd965029ceb4eb1cd6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jul 2023 09:04:28 +0800 Subject: [PATCH] fix(stream): fix error in generating checkpoint for state_window. --- source/dnode/mnode/impl/src/mndScheduler.c | 6 +++ source/libs/stream/src/stream.c | 3 ++ source/libs/stream/src/streamCheckpoint.c | 52 +++++++++++++--------- source/libs/stream/src/streamData.c | 9 ++++ source/libs/stream/src/streamMeta.c | 4 +- source/libs/stream/src/streamTask.c | 5 +-- 6 files changed, 54 insertions(+), 25 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 800005c27f..a5b2416ea9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -25,6 +25,7 @@ #define SINK_NODE_LEVEL (0) extern bool tsDeployOnSnode; +static int32_t setEpToDownstreamTask(SStreamTask* pTask, SStreamTask* pDownstream); static int32_t mndAddSinkTaskToStream(SStreamObj* pStream, SArray* pTaskList, SMnode* pMnode, int32_t vgId, SVgObj* pVgroup, int32_t fillHistory); static void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask); @@ -267,6 +268,11 @@ static int32_t addSourceStreamTask(SMnode* pMnode, SVgObj* pVgroup, SArray* pTas return terrno; } + for(int32_t i = 0; i < taosArrayGetSize(pSinkTaskList); ++i) { + SStreamTask* pSinkTask = taosArrayGetP(pSinkTaskList, i); + setEpToDownstreamTask(pTask, pSinkTask); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ba5af32940..5f034e3177 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -245,6 +245,8 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S int32_t status = 0; SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId); + ASSERT(pInfo != NULL); + if (!pInfo->dataAllowed) { qWarn("s-task:%s data from task:0x%x is denied", pTask->id.idStr, pReq->upstreamTaskId); status = TASK_INPUT_STATUS__BLOCKED; @@ -402,5 +404,6 @@ SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t } } + qError("s-task:%s failed to find upstream task:0x%x", pTask->id.idStr, taskId); return NULL; } \ No newline at end of file diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index ff7d7a0963..7358d16bad 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -135,7 +135,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; - pTask->checkpointNotReadyTasks = 1; + pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. @@ -149,9 +149,10 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); + } else { + streamFreeQitem((SStreamQueueItem*)pBlock); } - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } @@ -176,16 +177,18 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info streamProcessCheckpointReadyMsg(pTask); + streamFreeQitem((SStreamQueueItem*)pBlock); } - } else if (taskLevel == TASK_LEVEL__SINK) { + } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { // todo: sink node needs alignment?? - ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; + /* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); + pTask->status.taskStatus = TASK_STATUS__CK_READY; - // update the child Id for downstream tasks - streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); - qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); - } else { + // update the child Id for downstream tasks + streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); + qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); + streamFreeQitem((SStreamQueueItem*)pBlock); + } else {*/ ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); // update the child Id for downstream tasks @@ -197,24 +200,33 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc if (notReady > 0) { qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", id, pTask->info.selfChildId, notReady, num); + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } - qDebug( - "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg downstream", - id, num); - // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task - // can start local checkpoint procedure - pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); - // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY - // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task - // already. And then, dispatch check point msg to all downstream tasks - code = continueDispatchCheckpointBlock(pBlock, pTask); + if (taskLevel == TASK_LEVEL__SINK) { + pTask->status.taskStatus = TASK_STATUS__CK_READY; + qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", + id, num); + streamFreeQitem((SStreamQueueItem*)pBlock); + } else { + qDebug( + "s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, dispatch checkpoint msg " + "downstream", id, num); + + // set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task + // can start local checkpoint procedure + pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); + + // if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY + // put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task + // already. And then, dispatch check point msg to all downstream tasks + code = continueDispatchCheckpointBlock(pBlock, pTask); + } } - streamFreeQitem((SStreamQueueItem*)pBlock); return code; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 8248a97dca..5d77f2c15d 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -194,6 +194,11 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* } } +static void freeItems(void* param) { + SSDataBlock* pBlock = param; + taosArrayDestroy(pBlock->pDataBlock); +} + void streamFreeQitem(SStreamQueueItem* data) { int8_t type = data->type; if (type == STREAM_INPUT__GET_RES) { @@ -227,5 +232,9 @@ void streamFreeQitem(SStreamQueueItem* data) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; blockDataDestroy(pRefBlock->pBlock); taosFreeQitem(pRefBlock); + } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SStreamDataBlock* pBlock = (SStreamDataBlock*) data; + taosArrayDestroyEx(pBlock->blocks, freeItems); + taosFreeQitem(pBlock); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 59595cc341..fa53819f45 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -287,7 +287,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { if (!streamTaskShouldStop(&(*ppTask)->status)) { int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1); taosRUnLockLatch(&pMeta->lock); - qDebug("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); + qTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref); return *ppTask; } } @@ -299,7 +299,7 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); if (ref > 0) { - qDebug("s-task:%s release task, ref:%d", pTask->id.idStr, ref); + qTrace("s-task:%s release task, ref:%d", pTask->id.idStr, ref); } else if (ref == 0) { ASSERT(streamTaskShouldStop(&pTask->status)); tFreeStreamTask(pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ae22323271..dec2768975 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -259,10 +259,9 @@ void tFreeStreamTask(SStreamTask* pTask) { int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) { return 0; - } else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - return 1; } else { - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + int32_t type = pTask->outputType; + if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) { return 1; } else { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;