From 1b26da1bae5a90cca6e39f7547b13f0f3cbd30e1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 6 Jun 2022 22:18:50 +0800 Subject: [PATCH 1/2] refactor(stream): distributed execution --- include/common/tmsgdef.h | 1 + include/libs/stream/tstream.h | 17 ---- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/inc/streamInc.h | 1 + source/libs/stream/src/streamMsg.c | 145 +++++++++++++++++++++++++++- source/libs/stream/src/streamSink.c | 7 +- 6 files changed, 147 insertions(+), 26 deletions(-) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 73f8515f22..8a811774b2 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -187,6 +187,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index bb0b6dc0a0..82674c6115 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -285,12 +285,6 @@ struct SStreamTask { int8_t inputStatus; int8_t outputStatus; -#if 0 - STaosQueue* inputQ; - STaosQall* inputQAll; - STaosQueue* outputQ; - STaosQall* outputQAll; -#endif SStreamQueue* inputQueue; SStreamQueue* outputQueue; @@ -371,13 +365,6 @@ typedef struct { int32_t taskId; } SStreamTaskRunReq; -typedef struct { - // SMsgHead head; - int64_t streamId; - int64_t version; - SArray* res; // SArray -} SStreamSinkReq; - typedef struct { int64_t streamId; int32_t taskId; @@ -413,10 +400,6 @@ typedef struct { int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); -int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input); -int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input); -int32_t streamDequeueOutput(SStreamTask* pTask, void** output); - int32_t streamTaskRun(SStreamTask* pTask); int32_t streamTaskProcessRunReq(SStreamTask* pTask, SMsgCb* pMsgCb); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 38482ccad5..3de5109a1a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -216,8 +216,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } // TODO wrap in destroy func - taosArrayDestroy(rsp.blockData); taosArrayDestroy(rsp.blockDataLen); + taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree); if (rsp.withSchema) { taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 604539f16a..b5f7362689 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -25,6 +25,7 @@ extern "C" { int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 81f18fea8d..9f22bbbe8a 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -32,7 +32,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeBinary(pEncoder, data, len) < 0) return -1; } tEndEncode(pEncoder); - return 0; + return pEncoder->pos; } int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { @@ -60,11 +60,150 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { return 0; } -int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { +static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { + int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + void* buf = taosMemoryCalloc(1, dataStrLen); + if (buf == NULL) return -1; + + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf; + pRetrieve->useconds = 0; + pRetrieve->precision = TSDB_DEFAULT_PRECISION; + pRetrieve->compressed = 0; + pRetrieve->completed = 1; + pRetrieve->numOfRows = htonl(pBlock->info.rows); + + int32_t actualLen = 0; + blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false); + actualLen += sizeof(SRetrieveTableRsp); + ASSERT(actualLen <= dataStrLen); + taosArrayPush(pReq->dataLen, &actualLen); + taosArrayPush(pReq->data, &buf); + + return 0; +} + +int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { + void* buf = NULL; + int32_t code = -1; + int32_t blockNum = taosArrayGetSize(data->blocks); + ASSERT(blockNum != 0); + SStreamDispatchReq req = { .streamId = pTask->streamId, - .data = data, + .sourceTaskId = pTask->taskId, + .sourceVg = data->sourceVg, + .sourceChildId = pTask->childId, + .blockNum = blockNum, }; + + req.data = taosArrayInit(blockNum, sizeof(void*)); + req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + if (req.data == NULL || req.dataLen == NULL) { + goto FAIL; + } + for (int32_t i = 0; i < blockNum; i++) { + SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i); + if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { + goto FAIL; + } + } + int32_t vgId = 0; + int32_t downstreamTaskId = 0; + // find ep + if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + vgId = pTask->fixedEpDispatcher.nodeId; + *ppEpSet = &pTask->fixedEpDispatcher.epSet; + downstreamTaskId = pTask->fixedEpDispatcher.taskId; + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO get ctbName + char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0}; + SSDataBlock* pBlock = taosArrayGet(data->blocks, 0); + sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); + // get vg and ep + // TODO: get hash function by hashMethod + + // get groupId, compute hash value + uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); + + // get node + // TODO: optimize search process + SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; + int32_t sz = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < sz; i++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + vgId = pVgInfo->vgId; + downstreamTaskId = pVgInfo->taskId; + *ppEpSet = &pVgInfo->epSet; + break; + } + } + ASSERT(vgId != 0); + } + + req.taskId = downstreamTaskId; + + // serialize + int32_t tlen; + tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code); + if (code < 0) goto FAIL; + buf = rpcMallocCont(sizeof(SMsgHead) + tlen); + if (buf == NULL) { + code = -1; + goto FAIL; + } + + ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, tlen); + if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) { + goto FAIL; + } + tEncoderClear(&encoder); + + pMsg->contLen = tlen + sizeof(SMsgHead); + pMsg->pCont = buf; + + code = 0; +FAIL: + if (buf) taosMemoryFree(buf); + if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree); + if (req.dataLen) taosArrayDestroy(req.dataLen); + return code; +} + +int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { + if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { + SRpcMsg dispatchMsg = {0}; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) { + ASSERT(0); + return -1; + } + + int32_t qType; + if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) { + qType = FETCH_QUEUE; + } else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) { + qType = WRITE_QUEUE; + } else { + ASSERT(0); + } + tmsgPutToQueue(pMsgCb, qType, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); + } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + // TODO + ASSERT(0); + } return 0; } diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c index 6acdd1064c..6fd0a00517 100644 --- a/source/libs/stream/src/streamSink.c +++ b/source/libs/stream/src/streamSink.c @@ -41,12 +41,9 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); } - if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { - ASSERT(queue == pTask->outputQueue); - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - ASSERT(queue == pTask->outputQueue); - } else if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (pTask->dispatchType != TASK_DISPATCH__NONE) { ASSERT(queue == pTask->outputQueue); + streamDispatch(pTask, pMsgCb, pBlock); } streamQueueProcessSuccess(queue); From 858868d76a3f312e7d648b128957fd604d2f16f1 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 7 Jun 2022 09:49:03 +0800 Subject: [PATCH 2/2] refactor(stream): distributed execution --- examples/c/stream_demo.c | 4 ++-- include/libs/stream/tstream.h | 2 ++ source/dnode/vnode/src/tq/tq.c | 18 ++++++++++++++---- source/libs/stream/src/stream.c | 12 +++++++++--- source/libs/stream/src/streamExec.c | 1 + source/libs/stream/src/streamMsg.c | 28 ++++++++++++++++++++++------ source/libs/stream/src/streamSink.c | 13 +++++++++---- 7 files changed, 59 insertions(+), 19 deletions(-) diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 97ff2886fc..943fcbdb53 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -25,7 +25,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -82,7 +82,7 @@ int32_t create_stream() { /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query( - pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)"); + pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 82674c6115..db8d3ac033 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -398,6 +398,8 @@ typedef struct { int8_t inputStatus; } SStreamTaskRecoverRsp; +int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); + int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb); int32_t streamTaskRun(SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 3de5109a1a..2b5e18c1db 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) { - SStreamDispatchReq* pReq = pMsg->pCont; - int32_t taskId = pReq->taskId; - SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); - streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamDispatchReq req; + SDecoder decoder; + tDecoderInit(&decoder, msgBody, msgLen); + tDecodeStreamDispatchReq(&decoder, &req); + int32_t taskId = req.taskId; + SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); + SRpcMsg rsp = { + .info = pMsg->info, + .code = 0, + }; + streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp); return 0; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index d89f2ed57d..99c61e1479 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -57,12 +57,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* } // rsp by input status - SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp)); + void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); + ((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg); + SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); pCont->inputStatus = status; pCont->streamId = pReq->streamId; pCont->taskId = pReq->sourceTaskId; - pRsp->pCont = pCont; - pRsp->contLen = sizeof(SStreamDispatchRsp); + pRsp->pCont = buf; + pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } @@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp } int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) { + ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED); + int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); + ASSERT(old == TASK_OUTPUT_STATUS__WAIT); if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { // TODO: init recover timer + return 0; } // continue dispatch streamSink1(pTask, pMsgCb); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 4b88cf503e..72df516e0d 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) if (output == NULL) break; // TODO: do we need free memory? SSDataBlock* outputCopy = createOneDataBlock(output, true); + outputCopy->info.childId = pTask->childId; taosArrayPush(pRes, outputCopy); } return 0; diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 9f22bbbe8a..769d672042 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "tstream.h" +#include "streamInc.h" int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; @@ -147,13 +147,13 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM int32_t tlen; tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code); if (code < 0) goto FAIL; + code = -1; buf = rpcMallocCont(sizeof(SMsgHead) + tlen); if (buf == NULL) { - code = -1; goto FAIL; } - ((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId); + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); SEncoder encoder; @@ -165,16 +165,24 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM pMsg->contLen = tlen + sizeof(SMsgHead); pMsg->pCont = buf; + pMsg->msgType = pTask->dispatchMsgType; code = 0; FAIL: - if (buf) taosMemoryFree(buf); + if (code < 0 && buf) rpcFreeCont(buf); if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree); if (req.dataLen) taosArrayDestroy(req.dataLen); return code; } int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) { +#if 0 + int8_t old = + atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); + if (old != TASK_OUTPUT_STATUS__NORMAL) { + return 0; + } +#endif if (pTask->dispatchType == TASK_DISPATCH__INPLACE) { SRpcMsg dispatchMsg = {0}; if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) { @@ -201,12 +209,19 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat tmsgSendReq(pEpSet, &dispatchMsg); } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { - // TODO - ASSERT(0); + SRpcMsg dispatchMsg = {0}; + SEpSet* pEpSet = NULL; + if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) { + ASSERT(0); + return -1; + } + + tmsgSendReq(pEpSet, &dispatchMsg); } return 0; } +#if 0 static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { SStreamTaskExecReq req = { .streamId = pTask->streamId, @@ -287,3 +302,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb } return 0; } +#endif diff --git a/source/libs/stream/src/streamSink.c b/source/libs/stream/src/streamSink.c index 6fd0a00517..35bebe0e63 100644 --- a/source/libs/stream/src/streamSink.c +++ b/source/libs/stream/src/streamSink.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "executor.h" -#include "tstream.h" +#include "streamInc.h" int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { SStreamQueue* queue; @@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { } else { queue = pTask->outputQueue; } + /*if (streamDequeueBegin(queue) == true) {*/ /*return -1;*/ /*}*/ - if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA || + pTask->dispatchType != TASK_DISPATCH__NONE) { while (1) { SStreamDataBlock* pBlock = streamQueueNextItem(queue); if (pBlock == NULL) break; @@ -36,13 +36,18 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) { // local sink if (pTask->sinkType == TASK_SINK__TABLE) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); } else if (pTask->sinkType == TASK_SINK__SMA) { + ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks); } + // TODO: sink and dispatch should be only one if (pTask->dispatchType != TASK_DISPATCH__NONE) { ASSERT(queue == pTask->outputQueue); + ASSERT(pTask->sinkType == TASK_SINK__NONE); + streamDispatch(pTask, pMsgCb, pBlock); }