diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 34a0bc8657..bbeeb33523 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -296,15 +296,15 @@ typedef struct SDispatchMsgInfo { } SDispatchMsgInfo; typedef struct { - int8_t outputType; - int8_t outputStatus; - SStreamQueue* outputQueue; -} SSTaskOutputInfo; + int8_t type; + int8_t status; + SStreamQueue* queue; +} STaskOutputInfo; struct SStreamTask { SStreamId id; SSTaskBasicInfo info; - int8_t outputType; + STaskOutputInfo outputInfo; SDispatchMsgInfo msgInfo; SStreamStatus status; SCheckpointInfo chkInfo; @@ -326,9 +326,7 @@ struct SStreamTask { }; int8_t inputStatus; - int8_t outputStatus; SStreamQueue* inputQueue; - SStreamQueue* outputQueue; // trigger int8_t triggerStatus; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 33905bad86..7cb52d2bc9 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -87,10 +87,10 @@ END: int32_t mndSetSinkTaskInfo(SStreamObj* pStream, SStreamTask* pTask) { if (pStream->smaId != 0) { - pTask->outputType = TASK_OUTPUT__SMA; + pTask->outputInfo.type = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; } else { - pTask->outputType = TASK_OUTPUT__TABLE; + pTask->outputInfo.type = TASK_OUTPUT__TABLE; pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); @@ -110,7 +110,7 @@ int32_t mndAddDispatcherForInternalTask(SMnode* pMnode, SStreamObj* pStream, SAr SDbObj* pDb = mndAcquireDb(pMnode, pStream->targetDb); if (pDb != NULL && pDb->cfg.numOfVgroups > 1) { isShuffle = true; - pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; + pTask->outputInfo.type = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { return -1; @@ -291,7 +291,7 @@ void setFixedDownstreamEpInfo(SStreamTask* pDstTask, const SStreamTask* pTask) { pDispatcher->nodeId = pTask->info.nodeId; pDispatcher->epSet = pTask->info.epSet; - pDstTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; + pDstTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH; pDstTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index e4bc184be3..700b0191a5 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -66,14 +66,14 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(512 << 10); - pTask->outputQueue = streamQueueOpen(512 << 10); + pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; pTask->chkInfo.version = ver; pTask->pMeta = pSnode->pMeta; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bbdd98e356..97aa69bab6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -811,14 +811,14 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->inputQueue = streamQueueOpen(512 << 10); - pTask->outputQueue = streamQueueOpen(512 << 10); + pTask->outputInfo.queue = streamQueueOpen(512 << 10); - if (pTask->inputQueue == NULL || pTask->outputQueue == NULL) { + if (pTask->inputQueue == NULL || pTask->outputInfo.queue == NULL) { return -1; } pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; @@ -885,10 +885,10 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } // sink - if (pTask->outputType == TASK_OUTPUT__SMA) { + if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; - } else if (pTask->outputType == TASK_OUTPUT__TABLE) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqSinkToTablePipeline; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 07d7cb3040..4de9f6a7ed 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -216,15 +216,16 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // todo add log int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; - if (pTask->outputType == TASK_OUTPUT__TABLE) { + int32_t type = pTask->outputInfo.type; + if (type == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); destroyStreamDataBlock(pBlock); - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (type == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); destroyStreamDataBlock(pBlock); } else { - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - code = taosWriteQitem(pTask->outputQueue->queue, pBlock); + ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); + code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock); if (code != 0) { // todo failed to add it into the output queue, free it. return code; } @@ -274,7 +275,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code); // there are other dispatch message not response yet - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); if (leftRsp > 0) { @@ -283,9 +284,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } pTask->msgInfo.retryCount = 0; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); - qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputStatus); + qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status); // the input queue of the (down stream) task that receive the output data is full, // so the TASK_INPUT_STATUS_BLOCKED is rsp @@ -309,7 +310,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // now ready for next data output - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); // otherwise, continue dispatch the first block to down stream task in pipeline streamDispatchStreamBlock(pTask); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9241df2e70..566e7da9e4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -437,7 +437,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t numOfBlocks = taosArrayGetSize(pData->blocks); ASSERT(numOfBlocks != 0); - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = {0}; int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId; @@ -467,7 +467,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); return code; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -545,7 +545,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat static void doRetryDispatchData(void* param, void* tmrId) { SStreamTask* pTask = param; - ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT); + ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT); int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData); if (code != TSDB_CODE_SUCCESS) { @@ -561,29 +561,29 @@ void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) { } int32_t streamDispatchStreamBlock(SStreamTask* pTask) { - ASSERT((pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH)); + STaskOutputInfo* pInfo = &pTask->outputInfo; + ASSERT((pInfo->type == TASK_OUTPUT__FIXED_DISPATCH || pInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH)); - int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); + int32_t numOfElems = taosQueueItemSize(pInfo->queue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, numOfElems); } // to make sure only one dispatch is running - int8_t old = - atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + int8_t old = atomic_val_compare_exchange_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { qDebug("s-task:%s wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); return 0; } ASSERT(pTask->msgInfo.pData == NULL); - qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus); + qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pInfo->status); - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); + SStreamDataBlock* pBlock = streamQueueNextItem(pInfo->queue); if (pBlock == NULL) { - atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); - qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pTask->outputStatus); + atomic_store_8(&pInfo->status, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s not dispatch since no elems in outputQ, output status:%d", pTask->id.idStr, pInfo->status); return 0; } @@ -599,19 +599,19 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { } qDebug("s-task:%s failed to dispatch msg to downstream, code:%s, output status:%d, retry cnt:%d", pTask->id.idStr, - tstrerror(terrno), pTask->outputStatus, retryCount); + tstrerror(terrno), pInfo->status, retryCount); // todo deal with only partially success dispatch case atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0); - if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore + if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore destroyStreamDataBlock(pTask->msgInfo.pData); pTask->msgInfo.pData = NULL; return code; } - if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry - qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", pTask->id.idStr, - retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); + if (++retryCount > MAX_CONTINUE_RETRY_COUNT) { // add to timer to retry + qDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms", + pTask->id.idStr, retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); break; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c8aa6f5615..06eed5fee2 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -561,7 +561,7 @@ bool streamTaskIsIdle(const SStreamTask* pTask) { } // blocked by downstream task - if (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) { + if (pTask->outputInfo.status == TASK_OUTPUT_STATUS__BLOCKED) { return false; } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f51efb23d1..43449a1a62 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -96,7 +96,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId; req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; @@ -108,7 +108,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) { pWindow->skey, pWindow->ekey, req.reqId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -153,9 +153,9 @@ int32_t streamRecheckDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp* p qDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) (recheck)", pTask->id.idStr, pTask->info.nodeId, req.downstreamTaskId, req.downstreamNodeId); - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -179,7 +179,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs const char* id = pTask->id.idStr; if (pRsp->status == 1) { - if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds); @@ -218,7 +218,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); } } else { - ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH); + ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH); if (pRsp->reqId != pTask->checkReqId) { return -1; } @@ -296,10 +296,10 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) { SStreamScanHistoryFinishReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.taskId = pTask->fixedEpDispatcher.taskId; streamDoDispatchScanHistoryFinishMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); @@ -362,10 +362,10 @@ int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) { SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId }; // serialize - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.taskId = pTask->fixedEpDispatcher.taskId; doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; int32_t numOfVgs = taosArrayGetSize(vgInfo); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ef83583ea4..ca4586a1b4 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -44,7 +44,7 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHisto pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY; pTask->inputStatus = TASK_INPUT_STATUS__NORMAL; - pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; + pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL; addToTaskset(pTaskList, pTask); return pTask; @@ -74,7 +74,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; @@ -109,19 +109,19 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FETCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } @@ -137,7 +137,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; @@ -179,21 +179,21 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SMA) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FETCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } @@ -211,8 +211,8 @@ void tFreeStreamTask(SStreamTask* pTask) { streamQueueClose(pTask->inputQueue); } - if (pTask->outputQueue) { - streamQueueClose(pTask->outputQueue); + if (pTask->outputInfo.queue) { + streamQueueClose(pTask->outputInfo.queue); } if (pTask->exec.qmsg) { @@ -229,11 +229,11 @@ void tFreeStreamTask(SStreamTask* pTask) { } taosArrayDestroyP(pTask->pUpstreamEpInfoList, taosMemoryFree); - if (pTask->outputType == TASK_OUTPUT__TABLE) { + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper); taosMemoryFree(pTask->tbSink.pTSchema); tSimpleHashCleanup(pTask->tbSink.pTblInfo); - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL;