From 7361019e94c01d5302a74d4c0d8595f7df66c070 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 13:45:24 +0800 Subject: [PATCH 1/2] put createStreamExecTaskInfo into right place --- include/libs/executor/executor.h | 9 +++++- source/dnode/mnode/impl/inc/mndDef.h | 6 ++-- source/dnode/mnode/impl/src/mndSubscribe.c | 29 ++++++------------ source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/inc/tq.h | 3 +- source/dnode/vnode/src/tq/tq.c | 34 ++-------------------- source/libs/executor/src/executor.c | 34 +++++++++++++++++++++- 7 files changed, 58 insertions(+), 58 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 61970ff440..a98bb8f51a 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -26,6 +26,13 @@ typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SSubplan; + /** + * Create the exec task for streaming mode + * @param pMsg + * @param pStreamBlockReadHandle + * @return + */ +qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); /** * Create the exec task object according to task json @@ -203,4 +210,4 @@ void** qDeregisterQInfo(void* pMgmt, void* pQInfo); } #endif -#endif /*_TD_EXECUTOR_H_*/ \ No newline at end of file +#endif /*_TD_EXECUTOR_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index aaedf280b5..1507e2a30d 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -328,7 +328,7 @@ typedef struct SMqTopicConsumer { typedef struct SMqConsumerEp { int32_t vgId; // -1 for unassigned - SEpSet epset; + SEpSet epSet; int64_t consumerId; // -1 for unassigned int64_t lastConsumerHbTs; int64_t lastVgHbTs; @@ -339,7 +339,7 @@ typedef struct SMqConsumerEp { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); - tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); + tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); return tlen; @@ -347,7 +347,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); - buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); + buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index a6634a9f01..817ca4f4be 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -117,7 +117,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { // persist msg STransAction action = {0}; - action.epSet = pCEp->epset; + action.epSet = pCEp->epSet; action.pCont = reqStr; action.contLen = tlen; action.msgType = TDMT_VND_MQ_SET_CONN; @@ -142,36 +142,25 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { - SMqConsumerEp CEp; - CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; //convert phyplan to dag SQueryDag *pDag = qStringToDag(pTopic->physicalPlan); SArray *pArray; if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - + return -1; } int32_t sz = taosArrayGetSize(pArray); //convert dag to msg for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp CEp; + CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; STaskInfo* pTaskInfo = taosArrayGet(pArray, i); - int32_t vgId = pTaskInfo->addr.nodeId; - SEpSet epSet; - tConvertQueryAddrToEpSet(&epSet, &pTaskInfo->addr); + tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); + CEp.vgId = pTaskInfo->addr.nodeId; + taosArrayPush(unassignedVg, &CEp); } - /*pTopic->physicalPlan;*/ - SVgObj *pVgroup = NULL; - SSdb *pSdb = pMnode->pSdb; - void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); - while (pIter != NULL) { - if (pVgroup->dbUid == pTopic->dbUid) { - CEp.epset = mndGetVgroupEpset(pMnode, pVgroup); - CEp.vgId = pVgroup->vgId; - taosArrayPush(unassignedVg, &CEp); - } - pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); - } - return 0; + qDestroyQueryDag(pDag); + return 0; } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 75734d2d29..e625c56db1 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -26,6 +26,7 @@ target_link_libraries( PUBLIC tfs PUBLIC wal PUBLIC scheduler + PUBLIC executor PUBLIC qworker ) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index ec71777882..8c4effa221 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -21,6 +21,7 @@ #include "meta.h" #include "os.h" #include "scheduler.h" +#include "executor.h" #include "taoserror.h" #include "tlist.h" #include "tmsg.h" @@ -165,7 +166,7 @@ typedef struct STqTaskItem { int8_t status; int64_t offset; void* dst; - SSubQueryMsg* pMsg; + qTaskInfo_t task; } STqTaskItem; // new version diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c34c64214d..89d4af48fd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "../../../../../include/libs/executor/executor.h" #include "tqInt.h" #include "tqMetaStore.h" @@ -635,7 +634,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; - SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + /*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/ // TODO: launch query and get output data void* outputData; @@ -689,8 +688,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); - pTopic->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].status = 0; + pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL); } pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta @@ -763,32 +762,3 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { taosArrayPush(pArray, &colInfo); return pArray; } - -static qTaskInfo_t createExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { - if (pMsg == NULL || pStreamBlockReadHandle == NULL) { - return NULL; - } - - // print those info into log - pMsg->sId = be64toh(pMsg->sId); - pMsg->queryId = be64toh(pMsg->queryId); - pMsg->taskId = be64toh(pMsg->taskId); - pMsg->contentLen = ntohl(pMsg->contentLen); - - struct SSubplan *plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return NULL; - } - - qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); - if (code != TSDB_CODE_SUCCESS) { - // TODO: destroy SSubplan & pTaskInfo - terrno = code; - return NULL; - } - - return pTaskInfo; -} \ No newline at end of file diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 6dea4a4e57..a933402296 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -11,4 +11,36 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "planner.h" +#include "executor.h" + +qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { + if (pMsg == NULL || pStreamBlockReadHandle == NULL) { + return NULL; + } + + // print those info into log + pMsg->sId = be64toh(pMsg->sId); + pMsg->queryId = be64toh(pMsg->queryId); + pMsg->taskId = be64toh(pMsg->taskId); + pMsg->contentLen = ntohl(pMsg->contentLen); + + struct SSubplan *plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + return NULL; + } + + qTaskInfo_t pTaskInfo = NULL; + code = qCreateExecTask(pStreamBlockReadHandle, 0, plan, &pTaskInfo, NULL); + if (code != TSDB_CODE_SUCCESS) { + // TODO: destroy SSubplan & pTaskInfo + terrno = code; + return NULL; + } + + return pTaskInfo; +} From 4c4be3c575bdccd492afb3abc1603488786a377c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 21 Jan 2022 14:21:13 +0800 Subject: [PATCH 2/2] put createStreamExecTaskInfo into right place --- include/libs/executor/executor.h | 4 +++- source/dnode/vnode/inc/tq.h | 15 --------------- source/dnode/vnode/src/tq/tq.c | 24 +++++++++--------------- source/libs/executor/src/executor.c | 10 ++++++---- 4 files changed, 18 insertions(+), 35 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a98bb8f51a..36cc0f2665 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -32,7 +32,9 @@ struct SSubplan; * @param pStreamBlockReadHandle * @return */ -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle); + +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input); /** * Create the exec task object according to task json diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8c4effa221..f49542b5ec 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -82,27 +82,12 @@ typedef struct STqSubscribeReq { int64_t topic[]; } STqSubscribeReq; -typedef struct STqSubscribeRsp { - STqMsgHead head; - int64_t vgId; - char ep[TSDB_EP_LEN]; // TSDB_EP_LEN -} STqSubscribeRsp; - typedef struct STqHeartbeatReq { } STqHeartbeatReq; typedef struct STqHeartbeatRsp { } STqHeartbeatRsp; -typedef struct STqTopicVhandle { - int64_t topicId; - // executor for filter - void* filterExec; - // callback for mnode - // trigger when vnode list associated topic change - void* (*mCallback)(void*, void*); -} STqTopicVhandle; - #define TQ_BUFFER_SIZE 8 typedef struct STqExec { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89d4af48fd..52c541dcfd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -633,12 +633,16 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { // read until find TDMT_VND_SUBMIT } SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + void* task = pHandle->buffer.output[pos].task; - /*SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;*/ + qStreamExecTaskSetInput(task, pCont); + SSDataBlock* pDataBlock; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + } // TODO: launch query and get output data - void* outputData; - pHandle->buffer.output[pos].dst = outputData; + pHandle->buffer.output[pos].dst = pDataBlock; if (pHandle->buffer.firstOffset == -1 || pReq->offset < pHandle->buffer.firstOffset) { pHandle->buffer.firstOffset = pReq->offset; @@ -674,22 +678,12 @@ int32_t tqProcessSetConnReq(STQ* pTq, SMqSetCVgReq* pReq) { strcpy(pTopic->sql, pReq->sql); strcpy(pTopic->logicalPlan, pReq->logicalPlan); strcpy(pTopic->physicalPlan, pReq->physicalPlan); - SArray *pArray; - //TODO: deserialize to SQueryDag - SQueryDag *pDag; - // convert to task - if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { - // TODO: handle error - } - STaskInfo *pInfo = taosArrayGet(pArray, 0); - SArray* pTasks; - schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); + pTopic->buffer.firstOffset = -1; pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { - SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); pTopic->buffer.output[i].status = 0; - pTopic->buffer.output[i].task = createStreamExecTaskInfo(pMsg, NULL); + pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&pReq->msg, NULL); } pTopic->pReadhandle = walOpenReadHandle(pTq->pWal); // write mq meta diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index a933402296..49bf42f383 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -13,10 +13,12 @@ * along with this program. If not, see . */ -#include "planner.h" #include "executor.h" +#include "planner.h" -qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadHandle) { +void qStreamExecTaskSetInput(qTaskInfo_t qHandle, void* input) {} + +qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* pStreamBlockReadHandle) { if (pMsg == NULL || pStreamBlockReadHandle == NULL) { return NULL; } @@ -27,8 +29,8 @@ qTaskInfo_t createStreamExecTaskInfo(SSubQueryMsg *pMsg, void* pStreamBlockReadH pMsg->taskId = be64toh(pMsg->taskId); pMsg->contentLen = ntohl(pMsg->contentLen); - struct SSubplan *plan = NULL; - int32_t code = qStringToSubplan(pMsg->msg, &plan); + struct SSubplan* plan = NULL; + int32_t code = qStringToSubplan(pMsg->msg, &plan); if (code != TSDB_CODE_SUCCESS) { terrno = code; return NULL;