From d1363d9cb2283974826fbc4a37ba73e309063313 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 23 Mar 2022 14:47:24 +0800 Subject: [PATCH] task running --- example/src/tstream.c | 3 +- include/common/tmsg.h | 2 + source/common/src/tmsg.c | 31 ++++++------- source/dnode/mgmt/mnode/src/mmMsg.c | 2 + source/dnode/mnode/impl/src/mndScheduler.c | 17 ++++---- source/dnode/mnode/impl/src/mndStream.c | 51 +++++++++++++++++++--- source/dnode/mnode/impl/src/mndTopic.c | 2 +- source/dnode/vnode/src/inc/vnd.h | 6 --- source/dnode/vnode/src/tq/tq.c | 6 ++- 9 files changed, 82 insertions(+), 38 deletions(-) diff --git a/example/src/tstream.c b/example/src/tstream.c index 56650634c5..17073e14a6 100644 --- a/example/src/tstream.c +++ b/example/src/tstream.c @@ -77,7 +77,8 @@ int32_t create_stream() { } taos_free_result(pRes); - const char* sql = "select ts,sum(k) from tu1"; + const char* sql = "select sum(k) from tu1"; + /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ pRes = tmq_create_stream(pConn, "stream1", "out1", sql); if (taos_errno(pRes) != 0) { printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f14fb3d246..812025e8e4 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2345,6 +2345,8 @@ static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level) return NULL; } pTask->taskId = tGenIdPI32(); + pTask->streamId = streamId; + pTask->level = level; pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->qmsg = NULL; return pTask; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5b0c01a3a2..41c37cb7f2 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -313,12 +313,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->nFuncIds); - for(int8_t i=0; i< param->nFuncIds; ++i) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } tlen += taosEncodeFixedI64(buf, param->delay); @@ -340,12 +340,12 @@ int32_t tSerializeSVCreateTbReq(void **buf, SVCreateTbReq *pReq) { for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { tlen += taosEncodeFixedI16(buf, pReq->stbCfg.pBSmaCols[i]); } - if(pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { + if (pReq->rollup && NULL != pReq->stbCfg.pRSmaParam) { SRSmaParam *param = pReq->stbCfg.pRSmaParam; tlen += taosEncodeFixedU32(buf, (uint32_t)param->xFilesFactor); tlen += taosEncodeFixedI8(buf, param->delayUnit); tlen += taosEncodeFixedI8(buf, param->nFuncIds); - for(int8_t i=0; i< param->nFuncIds; ++i) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { tlen += taosEncodeFixedI32(buf, param->pFuncIds[i]); } tlen += taosEncodeFixedI64(buf, param->delay); @@ -385,7 +385,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeStringTo(buf, pReq->stbCfg.pTagSchema[i].name); } buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); - if(pReq->stbCfg.nBSmaCols > 0) { + if (pReq->stbCfg.nBSmaCols > 0) { pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); @@ -393,14 +393,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } else { pReq->stbCfg.pBSmaCols = NULL; } - if(pReq->rollup) { + if (pReq->rollup) { pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); - if(param->nFuncIds > 0) { - for (int8_t i = 0; i< param->nFuncIds; ++i) { + if (param->nFuncIds > 0) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { @@ -425,7 +425,7 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { buf = taosDecodeStringTo(buf, pReq->ntbCfg.pSchema[i].name); } buf = taosDecodeFixedI16(buf, &(pReq->stbCfg.nBSmaCols)); - if(pReq->stbCfg.nBSmaCols > 0) { + if (pReq->stbCfg.nBSmaCols > 0) { pReq->stbCfg.pBSmaCols = (col_id_t *)malloc(pReq->stbCfg.nBSmaCols * sizeof(col_id_t)); for (col_id_t i = 0; i < pReq->stbCfg.nBSmaCols; ++i) { buf = taosDecodeFixedI16(buf, pReq->stbCfg.pBSmaCols + i); @@ -433,14 +433,14 @@ void *tDeserializeSVCreateTbReq(void *buf, SVCreateTbReq *pReq) { } else { pReq->stbCfg.pBSmaCols = NULL; } - if(pReq->rollup) { + if (pReq->rollup) { pReq->stbCfg.pRSmaParam = (SRSmaParam *)malloc(sizeof(SRSmaParam)); SRSmaParam *param = pReq->stbCfg.pRSmaParam; - buf = taosDecodeFixedU32(buf, (uint32_t*)¶m->xFilesFactor); + buf = taosDecodeFixedU32(buf, (uint32_t *)¶m->xFilesFactor); buf = taosDecodeFixedI8(buf, ¶m->delayUnit); buf = taosDecodeFixedI8(buf, ¶m->nFuncIds); - if(param->nFuncIds > 0) { - for (int8_t i = 0; i< param->nFuncIds; ++i) { + if (param->nFuncIds > 0) { + for (int8_t i = 0; i < param->nFuncIds; ++i) { buf = taosDecodeFixedI32(buf, param->pFuncIds + i); } } else { @@ -2823,7 +2823,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; - if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; + if (tEncodeI32(&encoder, sqlLen) < 0) return -1; + if (tEncodeI32(&encoder, astLen) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; diff --git a/source/dnode/mgmt/mnode/src/mmMsg.c b/source/dnode/mgmt/mnode/src/mmMsg.c index 56a580ed93..fee568d20b 100644 --- a/source/dnode/mgmt/mnode/src/mmMsg.c +++ b/source/dnode/mgmt/mnode/src/mmMsg.c @@ -142,6 +142,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) { dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0); + dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); // Requests handled by VNODE dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a6d27ffa44..3793d10537 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -34,20 +34,21 @@ extern bool tsStreamSchedV; -int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { +int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); - int32_t tlen = sizeof(SMsgHead) + encoder.pos; + int32_t size = encoder.pos; + int32_t tlen = sizeof(SMsgHead) + size; tCoderClear(&encoder); void* buf = malloc(tlen); if (buf == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - ((SMsgHead*)buf)->streamTaskId = pTask->taskId; + ((SMsgHead*)buf)->streamTaskId = htonl(nodeId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); - tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); + tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER); tEncodeSStreamTask(&encoder, pTask); tCoderClear(&encoder); @@ -72,7 +73,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId); return 0; } @@ -92,7 +93,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY); + mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0); return 0; } @@ -118,7 +119,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SSubplan* plan = nodesListGetNode(inner->pNodeList, level); if (level == 0) { - ASSERT(plan->type == SUBPLAN_TYPE_SCAN); + ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN); void* pIter = NULL; while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -148,7 +149,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { SStreamTask* pTask = streamTaskNew(pStream->uid, level); pTask->pipeSource = 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0; - pTask->parallelizable = plan->type == SUBPLAN_TYPE_SCAN; + pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN; pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; if (tsStreamSchedV) { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 99aded0292..2ca4e10f68 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -21,6 +21,7 @@ #include "mndScheduler.h" #include "mndShow.h" #include "mndStb.h" +#include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" @@ -33,6 +34,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq); +static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp); /*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/ /*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); @@ -50,6 +52,8 @@ int32_t mndInitStream(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndStreamActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); + mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); + mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp); /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ @@ -68,7 +72,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { SCoder encoder; tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); - if (tEncodeSStreamObj(NULL, pStream) < 0) { + if (tEncodeSStreamObj(&encoder, pStream) < 0) { tCoderClear(&encoder); goto STREAM_ENCODE_OVER; } @@ -83,7 +87,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) { if (buf == NULL) goto STREAM_ENCODE_OVER; tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); - if (tEncodeSStreamObj(NULL, pStream) < 0) { + if (tEncodeSStreamObj(&encoder, pStream) < 0) { tCoderClear(&encoder); goto STREAM_ENCODE_OVER; } @@ -135,7 +139,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SCoder decoder; - tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER); + tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER); if (tDecodeSStreamObj(&decoder, pStream) < 0) { goto STREAM_DECODE_OVER; } @@ -191,6 +195,11 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) { sdbRelease(pSdb, pStream); } +static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp) { + mndTransProcessRsp(pRsp); + return 0; +} + static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { SName name = {0}; tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); @@ -209,6 +218,33 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) { return 0; } +static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) { + if (NULL == pCreate->ast) { + return TSDB_CODE_SUCCESS; + } + + SNode *pAst = NULL; + int32_t code = nodesStringToNode(pCreate->ast, &pAst); + + SQueryPlan *pPlan = NULL; + if (TSDB_CODE_SUCCESS == code) { + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + }; + code = qCreateQueryPlan(&cxt, &pPlan, NULL); + } + + if (TSDB_CODE_SUCCESS == code) { + code = nodesNodeToString(pPlan, false, pStr, NULL); + } + nodesDestroyNode(pAst); + nodesDestroyNode(pPlan); + terrno = code; + return code; +} + static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { mDebug("stream:%s to create", pCreate->name); SStreamObj streamObj = {0}; @@ -220,8 +256,13 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe streamObj.dbUid = pDb->uid; streamObj.version = 1; streamObj.sql = pCreate->sql; - streamObj.physicalPlan = ""; - streamObj.logicalPlan = ""; + /*streamObj.physicalPlan = "";*/ + streamObj.logicalPlan = "not implemented"; + + if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) { + mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); + return -1; + } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index bd0fd0e612..95dce5d8f1 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) { return 0; } -static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { +static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) { if (NULL == pCreate->ast) { return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index bad5b7c6a2..2c9a5368ac 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -82,12 +82,6 @@ struct SVnode { int vnodeScheduleTask(SVnodeTask* task); -int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); -int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq); -int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq); -int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq); -void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp); - #define vFatal(...) \ do { \ if (vDebugFlag & DEBUG_FATAL) { \ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 0615ea31d4..099f6896b1 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -120,7 +120,7 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { .code = 0, .msgType = type, }; - /*vnodeSendReq(pTq->pVnode, &pTask->NextOpEp, &msg);*/ + tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &msg); } } @@ -498,7 +498,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { } SCoder decoder; tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); - tDecodeSStreamTask(&decoder, pTask); + if (tDecodeSStreamTask(&decoder, pTask) < 0) { + ASSERT(0); + } tCoderClear(&decoder); tqExpandTask(pTq, pTask, 8);