Merge pull request #13532 from taosdata/feature/stream
refactor(stream): distributed execution
This commit is contained in:
commit
a0bc17c29f
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -82,7 +82,7 @@ int32_t create_stream() {
|
||||||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||||
pRes = taos_query(
|
pRes = taos_query(
|
||||||
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)");
|
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)");
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -187,6 +187,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TRIGGER, "vnode-stream-trigger", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RUN, "vnode-stream-task-run", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH, "vnode-stream-task-dispatch", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_DISPATCH_WRITE, "vnode-stream-task-dispatch-write", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_TASK_RECOVER, "vnode-stream-task-recover", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_SMA, "vnode-create-sma", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_VND_CANCEL_SMA, "vnode-cancel-sma", NULL, NULL)
|
||||||
|
|
|
@ -285,12 +285,6 @@ struct SStreamTask {
|
||||||
|
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
int8_t outputStatus;
|
int8_t outputStatus;
|
||||||
#if 0
|
|
||||||
STaosQueue* inputQ;
|
|
||||||
STaosQall* inputQAll;
|
|
||||||
STaosQueue* outputQ;
|
|
||||||
STaosQall* outputQAll;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SStreamQueue* inputQueue;
|
SStreamQueue* inputQueue;
|
||||||
SStreamQueue* outputQueue;
|
SStreamQueue* outputQueue;
|
||||||
|
@ -371,13 +365,6 @@ typedef struct {
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
} SStreamTaskRunReq;
|
} SStreamTaskRunReq;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
// SMsgHead head;
|
|
||||||
int64_t streamId;
|
|
||||||
int64_t version;
|
|
||||||
SArray* res; // SArray<SSDataBlock>
|
|
||||||
} SStreamSinkReq;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
@ -411,11 +398,9 @@ typedef struct {
|
||||||
int8_t inputStatus;
|
int8_t inputStatus;
|
||||||
} SStreamTaskRecoverRsp;
|
} SStreamTaskRecoverRsp;
|
||||||
|
|
||||||
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
int32_t streamEnqueueDataSubmit(SStreamTask* pTask, SStreamDataSubmit* input);
|
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
||||||
int32_t streamEnqueueDataBlk(SStreamTask* pTask, SStreamDataBlock* input);
|
|
||||||
int32_t streamDequeueOutput(SStreamTask* pTask, void** output);
|
|
||||||
|
|
||||||
int32_t streamTaskRun(SStreamTask* pTask);
|
int32_t streamTaskRun(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
|
@ -216,8 +216,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO wrap in destroy func
|
// TODO wrap in destroy func
|
||||||
taosArrayDestroy(rsp.blockData);
|
|
||||||
taosArrayDestroy(rsp.blockDataLen);
|
taosArrayDestroy(rsp.blockDataLen);
|
||||||
|
taosArrayDestroyP(rsp.blockData, (FDelete)taosMemoryFree);
|
||||||
|
|
||||||
if (rsp.withSchema) {
|
if (rsp.withSchema) {
|
||||||
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
taosArrayDestroyP(rsp.blockSchema, (FDelete)tDeleteSSchemaWrapper);
|
||||||
|
@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamDispatchReq* pReq = pMsg->pCont;
|
char* msgStr = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||||
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
SStreamDispatchReq req;
|
||||||
|
SDecoder decoder;
|
||||||
|
tDecoderInit(&decoder, msgBody, msgLen);
|
||||||
|
tDecodeStreamDispatchReq(&decoder, &req);
|
||||||
|
int32_t taskId = req.taskId;
|
||||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
SRpcMsg rsp = {
|
||||||
|
.info = pMsg->info,
|
||||||
|
.code = 0,
|
||||||
|
};
|
||||||
|
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ extern "C" {
|
||||||
|
|
||||||
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb);
|
||||||
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,12 +57,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
||||||
}
|
}
|
||||||
|
|
||||||
// rsp by input status
|
// rsp by input status
|
||||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
|
||||||
|
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
pCont->inputStatus = status;
|
pCont->inputStatus = status;
|
||||||
pCont->streamId = pReq->streamId;
|
pCont->streamId = pReq->streamId;
|
||||||
pCont->taskId = pReq->sourceTaskId;
|
pCont->taskId = pReq->sourceTaskId;
|
||||||
pRsp->pCont = pCont;
|
pRsp->pCont = buf;
|
||||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||||
tmsgSendRsp(pRsp);
|
tmsgSendRsp(pRsp);
|
||||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||||
|
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||||
|
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||||
|
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
// TODO: init recover timer
|
// TODO: init recover timer
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
// continue dispatch
|
// continue dispatch
|
||||||
streamSink1(pTask, pMsgCb);
|
streamSink1(pTask, pMsgCb);
|
||||||
|
|
|
@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
||||||
if (output == NULL) break;
|
if (output == NULL) break;
|
||||||
// TODO: do we need free memory?
|
// TODO: do we need free memory?
|
||||||
SSDataBlock* outputCopy = createOneDataBlock(output, true);
|
SSDataBlock* outputCopy = createOneDataBlock(output, true);
|
||||||
|
outputCopy->info.childId = pTask->childId;
|
||||||
taosArrayPush(pRes, outputCopy);
|
taosArrayPush(pRes, outputCopy);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tstream.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
@ -32,7 +32,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
||||||
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
if (tEncodeBinary(pEncoder, data, len) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(pEncoder);
|
tEndEncode(pEncoder);
|
||||||
return 0;
|
return pEncoder->pos;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
|
@ -60,14 +60,168 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) {
|
||||||
SStreamDispatchReq req = {
|
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||||
.streamId = pTask->streamId,
|
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||||
.data = data,
|
if (buf == NULL) return -1;
|
||||||
};
|
|
||||||
|
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)buf;
|
||||||
|
pRetrieve->useconds = 0;
|
||||||
|
pRetrieve->precision = TSDB_DEFAULT_PRECISION;
|
||||||
|
pRetrieve->compressed = 0;
|
||||||
|
pRetrieve->completed = 1;
|
||||||
|
pRetrieve->numOfRows = htonl(pBlock->info.rows);
|
||||||
|
|
||||||
|
int32_t actualLen = 0;
|
||||||
|
blockCompressEncode(pBlock, pRetrieve->data, &actualLen, pBlock->info.numOfCols, false);
|
||||||
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
|
ASSERT(actualLen <= dataStrLen);
|
||||||
|
taosArrayPush(pReq->dataLen, &actualLen);
|
||||||
|
taosArrayPush(pReq->data, &buf);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
|
void* buf = NULL;
|
||||||
|
int32_t code = -1;
|
||||||
|
int32_t blockNum = taosArrayGetSize(data->blocks);
|
||||||
|
ASSERT(blockNum != 0);
|
||||||
|
|
||||||
|
SStreamDispatchReq req = {
|
||||||
|
.streamId = pTask->streamId,
|
||||||
|
.sourceTaskId = pTask->taskId,
|
||||||
|
.sourceVg = data->sourceVg,
|
||||||
|
.sourceChildId = pTask->childId,
|
||||||
|
.blockNum = blockNum,
|
||||||
|
};
|
||||||
|
|
||||||
|
req.data = taosArrayInit(blockNum, sizeof(void*));
|
||||||
|
req.dataLen = taosArrayInit(blockNum, sizeof(int32_t));
|
||||||
|
if (req.data == NULL || req.dataLen == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < blockNum; i++) {
|
||||||
|
SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i);
|
||||||
|
if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
int32_t vgId = 0;
|
||||||
|
int32_t downstreamTaskId = 0;
|
||||||
|
// find ep
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
vgId = pTask->fixedEpDispatcher.nodeId;
|
||||||
|
*ppEpSet = &pTask->fixedEpDispatcher.epSet;
|
||||||
|
downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
// TODO get ctbName
|
||||||
|
char ctbName[TSDB_TABLE_FNAME_LEN + 22] = {0};
|
||||||
|
SSDataBlock* pBlock = taosArrayGet(data->blocks, 0);
|
||||||
|
sprintf(ctbName, "%s:%ld", pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId);
|
||||||
|
// get vg and ep
|
||||||
|
// TODO: get hash function by hashMethod
|
||||||
|
|
||||||
|
// get groupId, compute hash value
|
||||||
|
uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName));
|
||||||
|
|
||||||
|
// get node
|
||||||
|
// TODO: optimize search process
|
||||||
|
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t sz = taosArrayGetSize(vgInfo);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) {
|
||||||
|
vgId = pVgInfo->vgId;
|
||||||
|
downstreamTaskId = pVgInfo->taskId;
|
||||||
|
*ppEpSet = &pVgInfo->epSet;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ASSERT(vgId != 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
req.taskId = downstreamTaskId;
|
||||||
|
|
||||||
|
// serialize
|
||||||
|
int32_t tlen;
|
||||||
|
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
|
||||||
|
if (code < 0) goto FAIL;
|
||||||
|
code = -1;
|
||||||
|
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||||
|
if (buf == NULL) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
|
||||||
|
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
|
|
||||||
|
SEncoder encoder;
|
||||||
|
tEncoderInit(&encoder, abuf, tlen);
|
||||||
|
if ((code = tEncodeStreamDispatchReq(&encoder, &req)) < 0) {
|
||||||
|
goto FAIL;
|
||||||
|
}
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
|
||||||
|
pMsg->contLen = tlen + sizeof(SMsgHead);
|
||||||
|
pMsg->pCont = buf;
|
||||||
|
pMsg->msgType = pTask->dispatchMsgType;
|
||||||
|
|
||||||
|
code = 0;
|
||||||
|
FAIL:
|
||||||
|
if (code < 0 && buf) rpcFreeCont(buf);
|
||||||
|
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
|
||||||
|
if (req.dataLen) taosArrayDestroy(req.dataLen);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
|
||||||
|
#if 0
|
||||||
|
int8_t old =
|
||||||
|
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||||
|
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t qType;
|
||||||
|
if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH || pTask->dispatchMsgType == TDMT_SND_TASK_DISPATCH) {
|
||||||
|
qType = FETCH_QUEUE;
|
||||||
|
} else if (pTask->dispatchMsgType == TDMT_VND_TASK_DISPATCH_WRITE) {
|
||||||
|
qType = WRITE_QUEUE;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
tmsgPutToQueue(pMsgCb, qType, &dispatchMsg);
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
SEpSet* pEpSet = NULL;
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
|
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||||
|
SRpcMsg dispatchMsg = {0};
|
||||||
|
SEpSet* pEpSet = NULL;
|
||||||
|
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||||
|
ASSERT(0);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||||
SStreamTaskExecReq req = {
|
SStreamTaskExecReq req = {
|
||||||
.streamId = pTask->streamId,
|
.streamId = pTask->streamId,
|
||||||
|
@ -148,3 +302,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
|
@ -13,8 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "executor.h"
|
#include "streamInc.h"
|
||||||
#include "tstream.h"
|
|
||||||
|
|
||||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
SStreamQueue* queue;
|
SStreamQueue* queue;
|
||||||
|
@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
} else {
|
} else {
|
||||||
queue = pTask->outputQueue;
|
queue = pTask->outputQueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*if (streamDequeueBegin(queue) == true) {*/
|
/*if (streamDequeueBegin(queue) == true) {*/
|
||||||
/*return -1;*/
|
/*return -1;*/
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) {
|
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA ||
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
while (1) {
|
while (1) {
|
||||||
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
|
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
|
||||||
if (pBlock == NULL) break;
|
if (pBlock == NULL) break;
|
||||||
|
@ -36,17 +36,19 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
|
|
||||||
// local sink
|
// local sink
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
// TODO: sink and dispatch should be only one
|
||||||
ASSERT(queue == pTask->outputQueue);
|
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
|
||||||
ASSERT(queue == pTask->outputQueue);
|
|
||||||
} else if (pTask->dispatchType == TASK_DISPATCH__FIXED) {
|
|
||||||
ASSERT(queue == pTask->outputQueue);
|
ASSERT(queue == pTask->outputQueue);
|
||||||
|
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||||
|
|
||||||
|
streamDispatch(pTask, pMsgCb, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamQueueProcessSuccess(queue);
|
streamQueueProcessSuccess(queue);
|
||||||
|
|
Loading…
Reference in New Issue