From ab17bfdf0997edaf41b45dfc6cde86acdd8bfc82 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Oct 2023 19:16:55 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 68 +++++++++------- source/dnode/mnode/impl/src/mndScheduler.c | 22 ++--- source/dnode/snode/src/snode.c | 6 +- source/dnode/vnode/src/tq/tq.c | 26 +++--- source/dnode/vnode/src/tq/tqSink.c | 24 +++--- source/libs/stream/src/stream.c | 22 +++-- source/libs/stream/src/streamCheckpoint.c | 10 +-- source/libs/stream/src/streamDispatch.c | 94 +++++++++++----------- source/libs/stream/src/streamExec.c | 12 +-- source/libs/stream/src/streamMeta.c | 2 +- source/libs/stream/src/streamQueue.c | 40 ++++++--- source/libs/stream/src/streamRecover.c | 26 +++--- 12 files changed, 188 insertions(+), 164 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a67199a7d6..5c5a2e6adb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -290,6 +290,8 @@ typedef struct SSTaskBasicInfo { } SSTaskBasicInfo; typedef struct SStreamDispatchReq SStreamDispatchReq; +typedef struct STokenBucket STokenBucket; +typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data @@ -301,11 +303,10 @@ typedef struct SDispatchMsgInfo { void* pTimer; // used to dispatch data after a given time duration } SDispatchMsgInfo; -typedef struct STaskOutputInfo { - int8_t type; +typedef struct STaskOutputQueue { int8_t status; SStreamQueue* queue; -} STaskOutputInfo; +} STaskOutputQueue; typedef struct STaskInputInfo { int8_t status; @@ -348,29 +349,7 @@ typedef struct SHistoryTaskInfo { int32_t waitInterval; } SHistoryTaskInfo; -typedef struct STokenBucket STokenBucket; -typedef struct SMetaHbInfo SMetaHbInfo; - -struct SStreamTask { - int64_t ver; - SStreamTaskId id; - SSTaskBasicInfo info; - STaskOutputInfo outputInfo; - STaskInputInfo inputInfo; - STaskSchedInfo schedInfo; - SDispatchMsgInfo msgInfo; - SStreamStatus status; - SCheckpointInfo chkInfo; - STaskExec exec; - SDataRange dataRange; - SHistoryTaskInfo hTaskInfo; - STaskId streamTaskId; - STaskExecStatisInfo execInfo; - SArray* pReadyMsgList; // SArray - TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ - SArray* pUpstreamInfoList; - - // output +typedef struct STaskOutputInfo { union { STaskDispatcherFixed fixedDispatcher; STaskDispatcherShuffle shuffleDispatcher; @@ -379,11 +358,38 @@ struct SStreamTask { STaskSinkFetch fetchSink; }; + void* pTimer; // timer for launch sink tasks + int8_t type; STokenBucket* pTokenBucket; - SMsgCb* pMsgCb; // msg handle - SStreamState* pState; // state backend - SArray* pRspMsgList; +} STaskOutputInfo; +typedef struct SUpstreamInfo { + SArray* pList; + int32_t numOfClosed; +} SUpstreamInfo; + +struct SStreamTask { + int64_t ver; + SStreamTaskId id; + SSTaskBasicInfo info; + STaskOutputQueue outputq; + STaskInputInfo inputInfo; + STaskSchedInfo schedInfo; // todo remove it + STaskOutputInfo outputInfo; + SDispatchMsgInfo msgInfo; + SStreamStatus status; + SCheckpointInfo chkInfo; + STaskExec exec; + SDataRange dataRange; + SHistoryTaskInfo hTaskInfo; + STaskId streamTaskId; + STaskExecStatisInfo execInfo; + SArray* pReadyMsgList; // SArray + TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ + SMsgCb* pMsgCb; // msg handle + SStreamState* pState; // state backend + SArray* pRspMsgList; + SUpstreamInfo upstreamInfo; // the followings attributes don't be serialized int32_t notReadyTasks; int32_t numOfWaitingUpstream; @@ -669,7 +675,6 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupScheduleTrigger(SStreamTask* pTask); -int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); @@ -677,7 +682,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieve SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId); void streamTaskInputFail(SStreamTask* pTask); -int32_t streamTryExec(SStreamTask* pTask); +int32_t streamExecTask(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); @@ -693,6 +698,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask); int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage); int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); +bool streamTaskAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index d598dc11d2..2931f6be6b 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -87,15 +87,17 @@ END: } int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { + STaskOutputInfo* pInfo = &pTask->outputInfo; + if (pStream->smaId != 0) { - pTask->outputInfo.type = TASK_OUTPUT__SMA; - pTask->smaSink.smaId = pStream->smaId; + pInfo->type = TASK_OUTPUT__SMA; + pInfo->smaSink.smaId = pStream->smaId; } else { - pTask->outputInfo.type = TASK_OUTPUT__TABLE; - pTask->tbSink.stbUid = pStream->targetStbUid; - memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); - if (pTask->tbSink.pSchemaWrapper == NULL) { + pInfo->type = TASK_OUTPUT__TABLE; + pInfo->tbSink.stbUid = pStream->targetStbUid; + memcpy(pInfo->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + pInfo->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); + if (pInfo->tbSink.pSchemaWrapper == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } } @@ -113,7 +115,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr isShuffle = true; pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; - if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { + if (mndExtractDbInfo(pMnode, pDb, &pTask->outputInfo.shuffleDispatcher.dbInfo, NULL) < 0) { return -1; } } @@ -124,8 +126,8 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr int32_t numOfSinkNodes = taosArrayGetSize(pSinkNodeList); if (isShuffle) { - memcpy(pTask->shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); - SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + memcpy(pTask->outputInfo.shuffleDispatcher.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); + SArray* pVgs = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(pVgs); for (int32_t i = 0; i < numOfVgroups; i++) { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index c5fd202986..7fb0b6b40a 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -54,7 +54,7 @@ FAIL: } int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) { - ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->pUpstreamInfoList) != 0); + ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0); int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer); if (code != TSDB_CODE_SUCCESS) { return code; @@ -70,7 +70,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer qDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } - int32_t numOfChildEp = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t numOfChildEp = taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .vnode = NULL, .numOfVgroups = numOfChildEp, .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory }; initStreamStateAPI(&handle.api); @@ -206,7 +206,7 @@ int32_t sndProcessTaskRunReq(SSnode *pSnode, SRpcMsg *pMsg) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, pReq->streamId, pReq->taskId); if (pTask) { - streamProcessRunReq(pTask); + streamExecTask(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 826ca9a199..b89a671a0e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -797,7 +797,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, @@ -818,27 +818,27 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { // sink if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - pTask->smaSink.vnode = pTq->pVnode; - pTask->smaSink.smaSink = smaHandleRes; + pTask->outputInfo.smaSink.vnode = pTq->pVnode; + pTask->outputInfo.smaSink.smaSink = smaHandleRes; } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - pTask->tbSink.vnode = pTq->pVnode; - pTask->tbSink.tbSinkFunc = tqSinkDataIntoDstTable; + pTask->outputInfo.tbSink.vnode = pTq->pVnode; + pTask->outputInfo.tbSink.tbSinkFunc = tqSinkDataIntoDstTable; int32_t ver1 = 1; SMetaInfo info = {0}; - code = metaGetInfo(pTq->pVnode->pMeta, pTask->tbSink.stbUid, &info, NULL); + code = metaGetInfo(pTq->pVnode->pMeta, pTask->outputInfo.tbSink.stbUid, &info, NULL); if (code == TSDB_CODE_SUCCESS) { ver1 = info.skmVer; } - SSchemaWrapper* pschemaWrapper = pTask->tbSink.pSchemaWrapper; - pTask->tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); - if (pTask->tbSink.pTSchema == NULL) { + SSchemaWrapper* pschemaWrapper = pTask->outputInfo.tbSink.pSchemaWrapper; + pTask->outputInfo.tbSink.pTSchema = tBuildTSchema(pschemaWrapper->pSchema, pschemaWrapper->nCols, ver1); + if (pTask->outputInfo.tbSink.pTSchema == NULL) { return -1; } - pTask->tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); - tSimpleHashSetFreeFp(pTask->tbSink.pTblInfo, freePtr); + pTask->outputInfo.tbSink.pTblInfo = tSimpleHashInit(10240, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + tSimpleHashSetFreeFp(pTask->outputInfo.tbSink.pTblInfo, freePtr); } if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { @@ -1202,7 +1202,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { streamGetTaskStatusStr(TASK_STATUS__PAUSE), streamGetTaskStatusStr(pTask->status.keepTaskStatus)); } - streamTryExec(pTask); // exec directly + streamExecTask(pTask); // exec directly } else { STimeWindow* pWindow = &pTask->dataRange.window; tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64 @@ -1347,7 +1347,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { if (st == TASK_STATUS__NORMAL || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK) { tqDebug("vgId:%d s-task:%s start to process block from inputQ, next checked ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); - streamProcessRunReq(pTask); + streamExecTask(pTask); } else { int8_t status = streamTaskSetSchedStatusInActive(pTask); tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId, diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 23b5aff7fa..7d1c754005 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -142,7 +142,7 @@ static int32_t doBuildAndSendCreateTableMsg(SVnode* pVnode, char* stbFullName, S int64_t suid) { tqDebug("s-task:%s build create table msg", pTask->id.idStr); - STSchema* pTSchema = pTask->tbSink.pTSchema; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t rows = pDataBlock->info.rows; SArray* tagArray = NULL; int32_t code = 0; @@ -588,7 +588,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock int32_t doWaitForDstTableCreated(SVnode* pVnode, SStreamTask* pTask, STableSinkInfo* pTableSinkInfo, const char* dstTableName, int64_t* uid) { int32_t vgId = TD_VID(pVnode); - int64_t suid = pTask->tbSink.stbUid; + int64_t suid = pTask->outputInfo.tbSink.stbUid; const char* id = pTask->id.idStr; while (pTableSinkInfo->uid == 0) { @@ -631,12 +631,12 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat char* dstTableName = pDataBlock->info.parTbName; int32_t numOfRows = pDataBlock->info.rows; const char* id = pTask->id.idStr; - int64_t suid = pTask->tbSink.stbUid; - STSchema* pTSchema = pTask->tbSink.pTSchema; + int64_t suid = pTask->outputInfo.tbSink.stbUid; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); STableSinkInfo* pTableSinkInfo = NULL; - bool alreadyCached = tqGetTableInfo(pTask->tbSink.pTblInfo, groupId, &pTableSinkInfo); + bool alreadyCached = tqGetTableInfo(pTask->outputInfo.tbSink.pTblInfo, groupId, &pTableSinkInfo); if (alreadyCached) { if (dstTableName[0] == 0) { // data block does not set the destination table name @@ -702,7 +702,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat } pTableSinkInfo->uid = 0; - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id); + doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); } else { bool isValid = isValidDstChildTable(&mr, vgId, dstTableName, suid); if (!isValid) { @@ -716,7 +716,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat pTableSinkInfo->uid = mr.me.uid; metaReaderClear(&mr); - doPutIntoCache(pTask->tbSink.pTblInfo, pTableSinkInfo, groupId, id); + doPutIntoCache(pTask->outputInfo.tbSink.pTblInfo, pTableSinkInfo, groupId, id); } } } @@ -730,11 +730,11 @@ int32_t setDstTableDataPayload(SStreamTask* pTask, int32_t blockIndex, SSDataBlo const char* id = pTask->id.idStr; tqDebug("s-task:%s sink data pipeline, build submit msg from %dth resBlock, including %d rows, dst suid:%" PRId64, - id, blockIndex + 1, numOfRows, pTask->tbSink.stbUid); + id, blockIndex + 1, numOfRows, pTask->outputInfo.tbSink.stbUid); char* dstTableName = pDataBlock->info.parTbName; // convert all rows - int32_t code = doConvertRows(pTableData, pTask->tbSink.pTSchema, pDataBlock, id); + int32_t code = doConvertRows(pTableData, pTask->outputInfo.tbSink.pTSchema, pDataBlock, id); if (code != TSDB_CODE_SUCCESS) { tqError("s-task:%s failed to convert rows from result block, code:%s", id, tstrerror(terrno)); return code; @@ -759,9 +759,9 @@ bool hasOnlySubmitData(const SArray* pBlocks, int32_t numOfBlocks) { void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { const SArray* pBlocks = (const SArray*)data; SVnode* pVnode = (SVnode*)vnode; - int64_t suid = pTask->tbSink.stbUid; - char* stbFullName = pTask->tbSink.stbFullName; - STSchema* pTSchema = pTask->tbSink.pTSchema; + int64_t suid = pTask->outputInfo.tbSink.stbUid; + char* stbFullName = pTask->outputInfo.tbSink.stbFullName; + STSchema* pTSchema = pTask->outputInfo.tbSink.pTSchema; int32_t vgId = TD_VID(pVnode); int32_t numOfBlocks = taosArrayGetSize(pBlocks); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6f9a577a46..97316dba07 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -126,7 +126,7 @@ int32_t streamSchedExec(SStreamTask* pTask) { SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &msg); } else { - stDebug("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); + stTrace("s-task:%s not launch task since sched status:%d", pTask->id.idStr, pTask->status.schedStatus); } return 0; @@ -239,8 +239,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S status = TASK_INPUT_STATUS__BLOCKED; } else { // This task has received the checkpoint req from the upstream task, from which all the messages should be - // blocked + // blocked. Note that there is no race condition here. if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + atomic_add_fetch_32(&pTask->upstreamInfo.numOfClosed, 1); streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId); stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId); } @@ -274,13 +275,6 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S return 0; } -int32_t streamProcessRunReq(SStreamTask* pTask) { - if (streamTryExec(pTask) < 0) { - return -1; - } - return 0; -} - int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK); @@ -291,15 +285,17 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputInfo.status, TASK_INPUT_STATUS__FAILED); } void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); if (num == 0) { return; } for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); pInfo->dataAllowed = true; } + + pTask->upstreamInfo.numOfClosed = 0; } void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { @@ -310,9 +306,9 @@ void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) { } SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) { - int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); for (int32_t i = 0; i < num; ++i) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); if (pInfo->taskId == taskId) { return pInfo; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a87901eb47..6924d99585 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -92,7 +92,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea } static int32_t streamAlignCheckpoint(SStreamTask* pTask) { - int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num); if (old == 0) { stDebug("s-task:%s set initial align upstream num:%d", pTask->id.idStr, num); @@ -153,7 +153,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream pBlock->srcTaskId = pTask->id.taskId; pBlock->srcVgId = pTask->pMeta->vgId; - int32_t code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); + int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { @@ -192,14 +192,14 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t taskLevel = pTask->info.taskLevel; if (taskLevel == TASK_LEVEL__SOURCE) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", id, pTask->info.selfChildId); + stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info streamProcessCheckpointReadyMsg(pTask); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); + ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) > 0); if (pTask->chkInfo.startTs == 0) { pTask->chkInfo.startTs = taosGetTimestampMs(); pTask->execInfo.checkpoint += 1; @@ -210,7 +210,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc // there are still some upstream tasks not send checkpoint request, do nothing and wait for then int32_t notReady = streamAlignCheckpoint(pTask); - int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList); if (notReady > 0) { stDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d", id, pTask->info.selfChildId, notReady, num); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7b23366c53..a7a06dd884 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -195,11 +195,11 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) .retrieveLen = dataStrLen, }; - int32_t sz = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList); ASSERT(sz > 0); for (int32_t i = 0; i < sz; i++) { req.reqId = tGenIdPI64(); - SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->pUpstreamInfoList, i); + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; int32_t len; @@ -288,7 +288,7 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups) { int32_t getNumOfDispatchBranch(SStreamTask* pTask) { return (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) ? 1 - : taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + : taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); } static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { @@ -301,7 +301,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); - int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; code = tInitStreamDispatchReq(pReq, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type); if (code != TSDB_CODE_SUCCESS) { return code; @@ -318,10 +318,10 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.pData = pReq; } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); + int32_t rspCnt = atomic_load_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); SStreamDispatchReq* pReqs = taosMemoryCalloc(numOfVgroups, sizeof(SStreamDispatchReq)); @@ -352,7 +352,7 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD } if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); } pReqs[j].blockNum++; @@ -381,16 +381,16 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch const char* id = pTask->id.idStr; if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - int32_t vgId = pTask->fixedDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedDispatcher.epSet; - int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; + int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet); } else { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); stDebug("s-task:%s (child taskId:%d) start to shuffle-dispatch blocks to %d vgroup(s), msgId:%d", @@ -421,12 +421,12 @@ static void doRetryDispatchData(void* param, void* tmrId) { int32_t msgId = pTask->execInfo.dispatch; if (streamTaskShouldStop(&pTask->status)) { - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); return; } - ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); int32_t code = 0; { @@ -436,7 +436,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { SStreamDispatchReq *pReq = pTask->msgInfo.pData; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgroups = taosArrayGetSize(vgInfo); int32_t numOfFailed = taosArrayGetSize(pList); @@ -462,9 +462,9 @@ static void doRetryDispatchData(void* param, void* tmrId) { stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId); } else { - int32_t vgId = pTask->fixedDispatcher.nodeId; - SEpSet* pEpSet = &pTask->fixedDispatcher.epSet; - int32_t downstreamTaskId = pTask->fixedDispatcher.taskId; + int32_t vgId = pTask->outputInfo.fixedDispatcher.nodeId; + SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet; + int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id, pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId); @@ -476,7 +476,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { if (code != TSDB_CODE_SUCCESS) { if (!streamTaskShouldStop(&pTask->status)) { // stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr); -// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); +// atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); if (streamTaskShouldPause(&pTask->status)) { streamRetryDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS * 10); } else { @@ -487,7 +487,7 @@ static void doRetryDispatchData(void* param, void* tmrId) { stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref); } } else { - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s send success, jump out of timer, ref:%d", pTask->id.idStr, ref); } } @@ -508,7 +508,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int32_t vgSz, int64_t groupId) { uint32_t hashValue = 0; - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; if (pTask->pNameMap == NULL) { pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); } @@ -528,14 +528,14 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } if (pDataBlock->info.parTbName[0]) { - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } else { - buildCtbNameByGroupIdImpl(pTask->shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); - snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); + buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, pDataBlock->info.parTbName); + snprintf(ctbName, TSDB_TABLE_NAME_LEN, "%s.%s", pTask->outputInfo.shuffleDispatcher.dbInfo.db, pDataBlock->info.parTbName); } /*uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));*/ - SUseDbRsp* pDbInfo = &pTask->shuffleDispatcher.dbInfo; + SUseDbRsp* pDbInfo = &pTask->outputInfo.shuffleDispatcher.dbInfo; hashValue = taosGetTbHashVal(ctbName, strlen(ctbName), pDbInfo->hashMethod, pDbInfo->hashPrefix, pDbInfo->hashSuffix); taosMemoryFree(ctbName); @@ -560,7 +560,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } if (pReqs[j].blockNum == 0) { - atomic_add_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + atomic_add_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); } pReqs[j].blockNum++; @@ -576,27 +576,27 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); const char* id = pTask->id.idStr; - int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue); + int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputq.queue); if (numOfElems > 0) { - double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputq.queue->pQueue)); stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size); } // to make sure only one dispatch is running int8_t old = - atomic_val_compare_exchange_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + atomic_val_compare_exchange_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { stDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", id, old); return 0; } ASSERT(pTask->msgInfo.pData == NULL); - stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputInfo.status); + stDebug("s-task:%s start to dispatch msg, set output status:%d", id, pTask->outputq.status); - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputInfo.queue); + SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputq.queue); if (pBlock == NULL) { - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); - stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputInfo.status); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); + stDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", id, pTask->outputq.status); return 0; } @@ -620,10 +620,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } stDebug("s-task:%s failed to dispatch msg:%d to downstream, code:%s, output status:%d, retry cnt:%d", id, - pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputInfo.status, retryCount); + pTask->execInfo.dispatch, tstrerror(terrno), pTask->outputq.status, retryCount); // todo deal with only partially success dispatch case - atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); + atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); pTask->msgInfo.pData = NULL; @@ -631,7 +631,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); @@ -654,11 +654,11 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // serialize if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - req.downstreamTaskId = pTask->fixedDispatcher.taskId; + req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; pTask->notReadyTasks = 1; - doDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); + doDispatchScanHistoryFinishMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); pTask->notReadyTasks = numOfVgs; @@ -680,7 +680,7 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { // this function is usually invoked by sink/agg task int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { int32_t num = taosArrayGetSize(pTask->pReadyMsgList); - ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num); + ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num); for (int32_t i = 0; i < num; ++i) { SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i); @@ -1049,7 +1049,7 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId } // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline streamDispatchStreamBlock(pTask); @@ -1061,11 +1061,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t vgId = pTask->pMeta->vgId; int32_t msgId = pTask->execInfo.dispatch; + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, vgId); return TSDB_CODE_STREAM_TASK_NOT_EXIST; } + // discard invalid dispatch rsp msg if ((pRsp->msgId != msgId) || (pRsp->stage != pTask->pMeta->stage)) { stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64 " discard it", @@ -1107,7 +1109,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + leftRsp = atomic_sub_fetch_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 1); ASSERT(leftRsp >= 0); if (leftRsp > 0) { @@ -1127,17 +1129,17 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // all msg rsp already, continue if (leftRsp == 0) { - ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT); // we need to re-try send dispatch msg to downstream tasks int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList); if (numOfFailed > 0) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed); - stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt); + atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, numOfFailed); + stDebug("s-task:%s waiting rsp set to be %d", id, pTask->outputInfo.shuffleDispatcher.waitingRspCnt); } - int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s failed to dispatch msg to downstream code:%s, add timer to retry in %dms, ref:%d", pTask->id.idStr, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref); @@ -1155,7 +1157,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // now ready for next data output - atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); + atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); } else { handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2302e00bb3..31ccbe50f6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -36,10 +36,10 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl int32_t code = 0; int32_t type = pTask->outputInfo.type; if (type == TASK_OUTPUT__TABLE) { - pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, pBlock->blocks); + pTask->outputInfo.tbSink.tbSinkFunc(pTask, pTask->outputInfo.tbSink.vnode, pBlock->blocks); destroyStreamDataBlock(pBlock); } else if (type == TASK_OUTPUT__SMA) { - pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + pTask->outputInfo.smaSink.smaSink(pTask->outputInfo.smaSink.vnode, pTask->outputInfo.smaSink.smaId, pBlock->blocks); destroyStreamDataBlock(pBlock); } else { ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); @@ -487,7 +487,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock // agg task should dispatch trans-state msg to sink task, to flush all data to sink task. if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { pBlock->srcVgId = pTask->pMeta->vgId; - code = taosWriteQitem(pTask->outputInfo.queue->pQueue, pBlock); + code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock); if (code == 0) { streamDispatchStreamBlock(pTask); } else { @@ -607,7 +607,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { pTask->status.taskStatus == TASK_STATUS__DROPPING); } -int32_t streamTryExec(SStreamTask* pTask) { +int32_t streamExecTask(SStreamTask* pTask) { // this function may be executed by multi-threads, so status check is required. const char* id = pTask->id.idStr; @@ -615,7 +615,7 @@ int32_t streamTryExec(SStreamTask* pTask) { if (schedStatus == TASK_SCHED_STATUS__WAITING) { while (1) { int32_t code = streamExecForAll(pTask); - if (code < 0) { // todo this status shoudl be removed + if (code < 0) { // todo this status should be removed atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__FAILED); return -1; } @@ -663,7 +663,7 @@ int32_t streamTaskReloadState(SStreamTask* pTask) { } int32_t streamAlignTransferState(SStreamTask* pTask) { - int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t numOfUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); int32_t old = atomic_val_compare_exchange_32(&pTask->transferStateAlignCnt, 0, numOfUpstream); if (old == 0) { stDebug("s-task:%s set the transfer state aligncnt %d", pTask->id.idStr, numOfUpstream); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index b646b470ef..43707098bc 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -891,7 +891,7 @@ void metaHbToMnode(void* param, void* tmrId) { entry.inputRate = entry.inputQUsed*100.0/STREAM_TASK_QUEUE_CAPACITY_IN_SIZE; if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) { - entry.sinkQuota = (*pTask)->pTokenBucket->quotaRate; + entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate; entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index ce90f29451..676e85eadc 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -18,6 +18,7 @@ #define MAX_STREAM_EXEC_BATCH_NUM 32 #define MAX_SMOOTH_BURST_RATIO 5 // 5 sec #define WAIT_FOR_DURATION 40 +#define SINK_TASK_IDLE_DURATION 200 // 200 ms // todo refactor: // read data from input queue @@ -154,6 +155,10 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } +static void doLaunchSinkTask(void* param, void* tmrId) { + +} + int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { int32_t retryTimes = 0; @@ -166,8 +171,21 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; // no available token in bucket for sink task, let's wait for a little bit - if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->pTokenBucket, pTask->id.idStr))) { + if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, pTask->id.idStr))) { stDebug("s-task:%s no available token in bucket for sink data, wait", id); + +// if (streamTaskAllUpstreamClosed(pTask)) { +// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); +// stDebug("s-task:%s try start task in %dms in tmr, since all upstream inputQ is closed, ref:%d", pTask->id.idStr, +// SINK_TASK_IDLE_DURATION, ref); +// +// if (pTask->outputInfo.pTimer == NULL) { +// pTask->outputInfo.pTimer = taosTmrStart(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer); +// } else { +// taosTmrReset(doLaunchSinkTask, SINK_TASK_IDLE_DURATION, pTask, streamEnv.timer, &pTask->outputInfo.pTimer); +// } +// } + return TSDB_CODE_SUCCESS; } @@ -188,10 +206,10 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu if (*numOfBlocks > 0) { *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { - streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } } else { - streamTaskPutbackToken(pTask->pTokenBucket); + streamTaskPutbackToken(pTask->outputInfo.pTokenBucket); } return TSDB_CODE_SUCCESS; @@ -207,7 +225,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); // restore the token to bucket in case of checkpoint/trans-state msg - streamTaskPutbackToken(pTask->pTokenBucket); + streamTaskPutbackToken(pTask->outputInfo.pTokenBucket); *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; @@ -216,7 +234,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { - streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } streamQueueProcessFail(pTask->inputInfo.queue); @@ -237,7 +255,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { - streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } streamQueueProcessFail(pTask->inputInfo.queue); @@ -255,7 +273,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { - streamTaskConsumeQuota(pTask->pTokenBucket, *blockSize); + streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } return TSDB_CODE_SUCCESS; @@ -350,15 +368,15 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) // the result should be put into the outputQ in any cases, otherwise, the result may be lost int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBlock) { - STaosQueue* pQueue = pTask->outputInfo.queue->pQueue; + STaosQueue* pQueue = pTask->outputq.queue->pQueue; - while (streamQueueIsFull(pTask->outputInfo.queue)) { + while (streamQueueIsFull(pTask->outputq.queue)) { if (streamTaskShouldStop(&pTask->status)) { stInfo("s-task:%s discard result block due to task stop", pTask->id.idStr); return TSDB_CODE_STREAM_EXEC_CANCELLED; } - int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); + int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); // let's wait for there are enough space to hold this result pBlock stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, @@ -368,7 +386,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t code = taosWriteQitem(pQueue, pBlock); - int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); + int32_t total = streamQueueGetNumOfItems(pTask->outputq.queue); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); if (code != 0) { stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 85e57339e0..55ed555af6 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -40,7 +40,7 @@ static void streamTaskSetReady(SStreamTask* pTask, int32_t numOfReqs) { int32_t vgId = pMeta->vgId; if (pTask->status.taskStatus == TASK_STATUS__SCAN_HISTORY && pTask->info.taskLevel != TASK_LEVEL__SOURCE) { - pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamInfoList); + pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s", pTask->id.idStr, pTask->info.taskLevel, pTask->numOfWaitingUpstream, streamGetTaskStatusStr(pTask->status.taskStatus)); @@ -144,8 +144,8 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); - req.downstreamNodeId = pTask->fixedDispatcher.nodeId; - req.downstreamTaskId = pTask->fixedDispatcher.taskId; + req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; + req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; pTask->checkReqId = req.reqId; stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64 @@ -153,9 +153,9 @@ static int32_t doCheckDownstreamStatus(SStreamTask* pTask) { pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId); - streamSendCheckMsg(pTask, &req, pTask->fixedDispatcher.nodeId, &pTask->fixedDispatcher.epSet); + streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); pTask->notReadyTasks = numOfVgs; @@ -225,9 +225,9 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr, pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage); - streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->fixedDispatcher.epSet); + streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); for (int32_t i = 0; i < numOfVgs; i++) { @@ -241,7 +241,7 @@ static void recheckDownstreamTasks(void* param, void* tmrId) { } destroyRecheckInfo(pInfo); - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref); } @@ -341,7 +341,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs doProcessDownstreamReadyRsp(pTask, numOfReqs); } else { - int32_t total = taosArrayGetSize(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); + int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } @@ -367,7 +367,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs } else { STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp); - int8_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); @@ -528,7 +528,7 @@ int32_t streamProcessScanHistoryFinishReq(SStreamTask* pTask, SStreamScanHistory ASSERT(left >= 0); if (left == 0) { - int32_t numOfTasks = taosArrayGetSize(pTask->pUpstreamInfoList); + int32_t numOfTasks = taosArrayGetSize(pTask->upstreamInfo.pList); stDebug( "s-task:%s all %d upstream tasks finish scan-history data, set param for agg task for stream data and send " "rsp to all upstream tasks", @@ -640,7 +640,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) { - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); streamMetaReleaseTask(pMeta, pTask); stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d", @@ -672,7 +672,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { } // not in timer anymore - int8_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId, pHTaskInfo->retryTimes, ref); streamMetaReleaseTask(pMeta, pTask);