refactor(stream): distributed execution
This commit is contained in:
parent
1b26da1bae
commit
858868d76a
|
@ -25,7 +25,7 @@ int32_t init_env() {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1");
|
||||
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -82,7 +82,7 @@ int32_t create_stream() {
|
|||
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
|
||||
/*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/
|
||||
pRes = taos_query(
|
||||
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from tu1 interval(10m)");
|
||||
pConn, "create stream stream1 trigger at_once into outstb as select _wstartts, sum(k) from st1 interval(10m)");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
|
|
@ -398,6 +398,8 @@ typedef struct {
|
|||
int8_t inputStatus;
|
||||
} SStreamTaskRecoverRsp;
|
||||
|
||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t streamTriggerByWrite(SStreamTask* pTask, int32_t vgId, SMsgCb* pMsgCb);
|
||||
|
||||
int32_t streamTaskRun(SStreamTask* pTask);
|
||||
|
|
|
@ -421,10 +421,20 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
SStreamDispatchReq* pReq = pMsg->pCont;
|
||||
int32_t taskId = pReq->taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, pReq, pMsg);
|
||||
char* msgStr = pMsg->pCont;
|
||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
SStreamDispatchReq req;
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, msgBody, msgLen);
|
||||
tDecodeStreamDispatchReq(&decoder, &req);
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask* pTask = taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
|
||||
SRpcMsg rsp = {
|
||||
.info = pMsg->info,
|
||||
.code = 0,
|
||||
};
|
||||
streamProcessDispatchReq(pTask, &pTq->pVnode->msgCb, &req, &rsp);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -57,12 +57,14 @@ int32_t streamTaskEnqueue(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg*
|
|||
}
|
||||
|
||||
// rsp by input status
|
||||
SStreamDispatchRsp* pCont = rpcMallocCont(sizeof(SStreamDispatchRsp));
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->sourceVg);
|
||||
SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
pCont->inputStatus = status;
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->taskId = pReq->sourceTaskId;
|
||||
pRsp->pCont = pCont;
|
||||
pRsp->contLen = sizeof(SStreamDispatchRsp);
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
@ -87,8 +89,12 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDisp
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDispatchRsp* pRsp) {
|
||||
ASSERT(pRsp->inputStatus == TASK_OUTPUT_STATUS__NORMAL || pRsp->inputStatus == TASK_OUTPUT_STATUS__BLOCKED);
|
||||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
return 0;
|
||||
}
|
||||
// continue dispatch
|
||||
streamSink1(pTask, pMsgCb);
|
||||
|
|
|
@ -43,6 +43,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes)
|
|||
if (output == NULL) break;
|
||||
// TODO: do we need free memory?
|
||||
SSDataBlock* outputCopy = createOneDataBlock(output, true);
|
||||
outputCopy->info.childId = pTask->childId;
|
||||
taosArrayPush(pRes, outputCopy);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
@ -13,7 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "tstream.h"
|
||||
#include "streamInc.h"
|
||||
|
||||
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq) {
|
||||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
|
@ -147,13 +147,13 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
|||
int32_t tlen;
|
||||
tEncodeSize(tEncodeStreamDispatchReq, &req, tlen, code);
|
||||
if (code < 0) goto FAIL;
|
||||
code = -1;
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
code = -1;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pTask->fixedEpDispatcher.nodeId);
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
|
@ -165,16 +165,24 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, SStreamDataBlock* data, SRpcM
|
|||
|
||||
pMsg->contLen = tlen + sizeof(SMsgHead);
|
||||
pMsg->pCont = buf;
|
||||
pMsg->msgType = pTask->dispatchMsgType;
|
||||
|
||||
code = 0;
|
||||
FAIL:
|
||||
if (buf) taosMemoryFree(buf);
|
||||
if (code < 0 && buf) rpcFreeCont(buf);
|
||||
if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree);
|
||||
if (req.dataLen) taosArrayDestroy(req.dataLen);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* data) {
|
||||
#if 0
|
||||
int8_t old =
|
||||
atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT);
|
||||
if (old != TASK_OUTPUT_STATUS__NORMAL) {
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
if (pTask->dispatchType == TASK_DISPATCH__INPLACE) {
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, NULL) < 0) {
|
||||
|
@ -201,12 +209,19 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SStreamDataBlock* dat
|
|||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
} else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
SRpcMsg dispatchMsg = {0};
|
||||
SEpSet* pEpSet = NULL;
|
||||
if (streamBuildDispatchMsg(pTask, data, &dispatchMsg, &pEpSet) < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t streamBuildExecMsg(SStreamTask* pTask, SArray* data, SRpcMsg* pMsg, SEpSet** ppEpSet) {
|
||||
SStreamTaskExecReq req = {
|
||||
.streamId = pTask->streamId,
|
||||
|
@ -287,3 +302,4 @@ static int32_t streamShuffleDispatch(SStreamTask* pTask, SMsgCb* pMsgCb, SHashOb
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -13,8 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "executor.h"
|
||||
#include "tstream.h"
|
||||
#include "streamInc.h"
|
||||
|
||||
int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||
SStreamQueue* queue;
|
||||
|
@ -23,12 +22,13 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
} else {
|
||||
queue = pTask->outputQueue;
|
||||
}
|
||||
|
||||
/*if (streamDequeueBegin(queue) == true) {*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
|
||||
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA) {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
if (pTask->sinkType == TASK_SINK__TABLE || pTask->sinkType == TASK_SINK__SMA ||
|
||||
pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
while (1) {
|
||||
SStreamDataBlock* pBlock = streamQueueNextItem(queue);
|
||||
if (pBlock == NULL) break;
|
||||
|
@ -36,13 +36,18 @@ int32_t streamSink1(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
|||
|
||||
// local sink
|
||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||
pTask->smaSink.smaSink(pTask->ahandle, pTask->smaSink.smaId, pBlock->blocks);
|
||||
}
|
||||
|
||||
// TODO: sink and dispatch should be only one
|
||||
if (pTask->dispatchType != TASK_DISPATCH__NONE) {
|
||||
ASSERT(queue == pTask->outputQueue);
|
||||
ASSERT(pTask->sinkType == TASK_SINK__NONE);
|
||||
|
||||
streamDispatch(pTask, pMsgCb, pBlock);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue