From 901b7d8eccbf670192c8e15cb5de770a71faf459 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jul 2023 15:18:57 +0800 Subject: [PATCH] fix(stream): if a source task is set to be non-normal, no data should be put into inputQ anymore. --- include/libs/stream/tstream.h | 4 ++-- source/dnode/snode/src/snode.c | 1 + source/dnode/vnode/src/tq/tq.c | 12 ++++++++---- source/dnode/vnode/src/tq/tqRestore.c | 16 +++++++++++++--- source/libs/stream/src/streamCheckpoint.c | 18 ++++++------------ source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamTask.c | 1 + 7 files changed, 32 insertions(+), 22 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 278c19bdf7..cea80fe4ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -310,6 +310,7 @@ struct SStreamTask { SStreamId streamTaskId; SArray* pUpstreamInfoList; // SArray, // children info SArray* pReadyMsgList; // SArray + TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ // output union { @@ -566,6 +567,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); +int32_t streamTaskGetInputQItems(const SStreamTask* pTask); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); @@ -615,9 +617,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index a959060ee2..534704e462 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -76,6 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; pTask->pMeta = pSnode->pMeta; + taosThreadMutexInit(&pTask->lock, NULL); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89beaadf72..456d1ec2c7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -928,16 +928,20 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 - " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, - pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + taosThreadMutexInit(&pTask->lock, NULL); if (pTask->chkInfo.checkpointId != 0) { + // checkpoint ver is the kept version, handled data should be the next version. + pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); } + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 + " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9eae7c66e7..70277a8356 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -268,17 +268,25 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + int32_t numOfItems = streamTaskGetInputQItems(pTask); // append the data for the stream SStreamQueueItem* pItem = NULL; code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); - if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue + if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; } + taosThreadMutexLock(&pTask->lock); + + if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + taosThreadMutexUnlock(&pTask->lock); + continue; + } + if (pItem != NULL) { noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); @@ -292,7 +300,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } } - if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) { + taosThreadMutexUnlock(&pTask->lock); + + if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) { code = streamSchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7358d16bad..1fc2e96161 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -133,13 +133,18 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. + taosThreadMutexLock(&pTask->lock); + pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + taosThreadMutexUnlock(&pTask->lock); + + return code; } static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -180,15 +185,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - // todo: sink node needs alignment?? - /* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; - - // update the child Id for downstream tasks - streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); - qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); - streamFreeQitem((SStreamQueueItem*)pBlock); - } else {*/ ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); // update the child Id for downstream tasks @@ -204,8 +200,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc return code; } - - if (taskLevel == TASK_LEVEL__SINK) { pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cad0d58925..b896b47ee4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -295,7 +295,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) { +int32_t streamTaskGetInputQItems(const SStreamTask* pTask) { int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue); int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dec2768975..77f3202c79 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -244,6 +244,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); + taosThreadMutexDestroy(&pTask->lock); if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr);