diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 61970ff440..36cc0f2665 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -26,6 +26,15 @@ typedef void* qTaskInfo_t;
typedef void* DataSinkHandle;
struct SSubplan;
+ /**
+ * Create the exec task for streaming mode
+ * @param pMsg
+ * @param pStreamBlockReadHandle
+ * @return
+ */
+qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle);
+
+void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input);
/**
* Create the exec task object according to task json
@@ -203,4 +212,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
}
#endif
-#endif /*_TD_EXECUTOR_H_*/
\ No newline at end of file
+#endif /*_TD_EXECUTOR_H_*/
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index aaedf280b5..1507e2a30d 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -328,7 +328,7 @@ typedef struct SMqTopicConsumer {
typedef struct SMqConsumerEp {
int32_t vgId; // -1 for unassigned
- SEpSet epset;
+ SEpSet epSet;
int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs;
int64_t lastVgHbTs;
@@ -339,7 +339,7 @@ typedef struct SMqConsumerEp {
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId);
- tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset);
+ tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
return tlen;
@@ -347,7 +347,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId);
- buf = taosDecodeSEpSet(buf, &pConsumerEp->epset);
+ buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c
index a6634a9f01..817ca4f4be 100644
--- a/source/dnode/mnode/impl/src/mndSubscribe.c
+++ b/source/dnode/mnode/impl/src/mndSubscribe.c
@@ -117,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// persist msg
STransAction action = {0};
- action.epSet = pCEp->epset;
+ action.epSet = pCEp->epSet;
action.pCont = reqStr;
action.contLen = tlen;
action.msgType = TDMT_VND_MQ_SET_CONN;
@@ -142,36 +142,25 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
}
static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) {
- SMqConsumerEp CEp;
- CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
//convert phyplan to dag
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
SArray *pArray;
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
-
+ return -1;
}
int32_t sz = taosArrayGetSize(pArray);
//convert dag to msg
for (int32_t i = 0; i < sz; i++) {
+ SMqConsumerEp CEp;
+ CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
- int32_t vgId = pTaskInfo->addr.nodeId;
- SEpSet epSet;
- tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr);
+ tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
+ CEp.vgId = pTaskInfo->addr.nodeId;
+ taosArrayPush(unassignedVg, &CEp);
}
- /*pTopic->physicalPlan;*/
- SVgObj *pVgroup = NULL;
- SSdb *pSdb = pMnode->pSdb;
- void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup);
- while (pIter != NULL) {
- if (pVgroup->dbUid == pTopic->dbUid) {
- CEp.epset = mndGetVgroupEpset(pMnode, pVgroup);
- CEp.vgId = pVgroup->vgId;
- taosArrayPush(unassignedVg, &CEp);
- }
- pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
- }
- return 0;
+
qDestroyQueryDag(pDag);
+ return 0;
}
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt
index 75734d2d29..e625c56db1 100644
--- a/source/dnode/vnode/CMakeLists.txt
+++ b/source/dnode/vnode/CMakeLists.txt
@@ -26,6 +26,7 @@ target_link_libraries(
PUBLIC tfs
PUBLIC wal
PUBLIC scheduler
+ PUBLIC executor
PUBLIC qworker
)
diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h
index ec71777882..f49542b5ec 100644
--- a/source/dnode/vnode/inc/tq.h
+++ b/source/dnode/vnode/inc/tq.h
@@ -21,6 +21,7 @@
#include "meta.h"
#include "os.h"
#include "scheduler.h"
+#include "executor.h"
#include "taoserror.h"
#include "tlist.h"
#include "tmsg.h"
@@ -81,27 +82,12 @@ typedef struct STqSubscribeReq {
int64_t topic[];
} STqSubscribeReq;
-typedef struct STqSubscribeRsp {
- STqMsgHead head;
- int64_t vgId;
- char ep[TSDB_EP_LEN]; // TSDB_EP_LEN
-} STqSubscribeRsp;
-
typedef struct STqHeartbeatReq {
} STqHeartbeatReq;
typedef struct STqHeartbeatRsp {
} STqHeartbeatRsp;
-typedef struct STqTopicVhandle {
- int64_t topicId;
- // executor for filter
- void* filterExec;
- // callback for mnode
- // trigger when vnode list associated topic change
- void* (*mCallback)(void*, void*);
-} STqTopicVhandle;
-
#define TQ_BUFFER_SIZE 8
typedef struct STqExec {
@@ -165,7 +151,7 @@ typedef struct STqTaskItem {
int8_t status;
int64_t offset;
void* dst;
- SSubQueryMsg* pMsg;
+ qTaskInfo_t task;
} STqTaskItem;
// new version
diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c
index c34c64214d..52c541dcfd 100644
--- a/source/dnode/vnode/src/tq/tq.c
+++ b/source/dnode/vnode/src/tq/tq.c
@@ -13,7 +13,6 @@
* along with this program. If not, see .
*/
-#include "../../../../../include/libs/executor/executor.h"
#include "tqInt.h"
#include "tqMetaStore.h"
@@ -634,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
// read until find TDMT_VND_SUBMIT
}
SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body;
+ void* task = pHandle->buffer.output[pos].task;
- SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
+ qStreamExecTaskSetInput(task, pCont);
+ SSDataBlock* pDataBlock;
+ uint64_t ts;
+ if (qExecTask(task, &pDataBlock, &ts) < 0) {
+ }
// TODO: launch query and get output data
- void* outputData;
- pHandle->buffer.output[pos].dst = outputData;
+ pHandle->buffer.output[pos].dst = pDataBlock;
if (pHandle->buffer.firstOffset == -1
|| pReq->offset < pHandle->buffer.firstOffset) {
pHandle->buffer.firstOffset = pReq->offset;
@@ -675,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) {
strcpy(pTopic->sql, pReq->sql);
strcpy(pTopic->logicalPlan, pReq->logicalPlan);
strcpy(pTopic->physicalPlan, pReq->physicalPlan);
- SArray *pArray;
- //TODO: deserialize to SQueryDag
- SQueryDag *pDag;
- // convert to task
- if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
- // TODO: handle error
- }
- STaskInfo *pInfo = taosArrayGet(pArray, 0);
- SArray* pTasks;
- schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
+
pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
- SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
- pTopic->buffer.output[i].pMsg = pMsg;
pTopic->buffer.output[i].status = 0;
+ pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL);
}
pTopic->pReadhandle = walOpenReadHandle(pTq->pWal);
// write mq meta
@@ -763,32 +756,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
taosArrayPush(pArray, &colInfo);
return pArray;
}
-
-static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) {
- if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
- return NULL;
- }
-
- // print those info into log
- pMsg->sId = be64toh(pMsg->sId);
- pMsg->queryId = be64toh(pMsg->queryId);
- pMsg->taskId = be64toh(pMsg->taskId);
- pMsg->contentLen = ntohl(pMsg->contentLen);
-
- struct SSubplan *plan = NULL;
- int32_t code = qStringToSubplan(pMsg->msg, &plan);
- if (code != TSDB_CODE_SUCCESS) {
- terrno = code;
- return NULL;
- }
-
- qTaskInfo_t pTaskInfo = NULL;
- code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
- if (code != TSDB_CODE_SUCCESS) {
- // TODO: destroy SSubplan & pTaskInfo
- terrno = code;
- return NULL;
- }
-
- return pTaskInfo;
-}
\ No newline at end of file
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 6dea4a4e57..49bf42f383 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -11,4 +11,38 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
- */
\ No newline at end of file
+ */
+
+#include "executor.h"
+#include "planner.h"
+
+void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {}
+
+qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) {
+ if (pMsg == NULL || pStreamBlockReadHandle == NULL) {
+ return NULL;
+ }
+
+ // print those info into log
+ pMsg->sId = be64toh(pMsg->sId);
+ pMsg->queryId = be64toh(pMsg->queryId);
+ pMsg->taskId = be64toh(pMsg->taskId);
+ pMsg->contentLen = ntohl(pMsg->contentLen);
+
+ struct SSubplan* plan = NULL;
+ int32_t code = qStringToSubplan(pMsg->msg, &plan);
+ if (code != TSDB_CODE_SUCCESS) {
+ terrno = code;
+ return NULL;
+ }
+
+ qTaskInfo_t pTaskInfo = NULL;
+ code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL);
+ if (code != TSDB_CODE_SUCCESS) {
+ // TODO: destroy SSubplan & pTaskInfo
+ terrno = code;
+ return NULL;
+ }
+
+ return pTaskInfo;
+}