diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a98bb8f51a..36cc0f2665 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,9 @@ struct SSubplan; * @param pStreamBlockReadHandle * @return */ -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); + +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input); /** * Create the exec task object according to task json diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8c4effa221..f49542b5ec 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -82,27 +82,12 @@ typedef struct STqSubscribeReq { int64_t topic[]; } STqSubscribeReq; -typedef struct STqSubscribeRsp { - STqMsgHead head; - int64_t vgId; - char ep[TSDB_EP_LEN]; // TSDB_EP_LEN -} STqSubscribeRsp; - typedef struct STqHeartbeatReq { } STqHeartbeatReq; typedef struct STqHeartbeatRsp { } STqHeartbeatRsp; -typedef struct STqTopicVhandle { - int64_t topicId; - // executor for filter - void* filterExec; - // callback for mnode - // trigger when vnode list associated topic change - void* (*mCallback)(void*, void*); -} STqTopicVhandle; - #define TQ_BUFFER_SIZE 8 typedef struct STqExec { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89d4af48fd..52c541dcfd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -633,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { // read until find TDMT_VND_SUBMIT } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + void* task = pHandle->buffer.output[pos].task; - /*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/ + qStreamExecTaskSetInput(task, pCont); + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + } // TODO: launch query and get output data - void* outputData; - pHandle->buffer.output[pos].dst = outputData; + pHandle->buffer.output[pos].dst = pDataBlock; if (pHandle->buffer.firstOffset == -1 || pReq->offset < pHandle->buffer.firstOffset) { pHandle->buffer.firstOffset = pReq->offset; @@ -674,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan); - SArray *pArray; - //TODO: deserialize to SQueryDag - SQueryDag *pDag; - // convert to task - if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - // TODO: handle error - } - STaskInfo *pInfo = taosArrayGet(pArray, 0); - SArray* pTasks; - schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); + pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); pTopic->buffer.output[i].status = 0; - pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL); } pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a933402296..49bf42f383 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -13,10 +13,12 @@ * along with this program. If not, see . */ -#include "planner.h" #include "executor.h" +#include "planner.h" -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {} + +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) { if (pMsg == NULL || pStreamBlockReadHandle == NULL) { return NULL; } @@ -27,8 +29,8 @@ qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadH pMsg->taskId = be64toh(pMsg->taskId); pMsg->contentLen = ntohl(pMsg->contentLen); - struct SSubplan *plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + struct SSubplan* plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL;