task running

This commit is contained in:
Liu Jicong 2022-03-23 14:47:24 +08:00
parent c3b3779713
commit d1363d9cb2
9 changed files with 82 additions and 38 deletions

View File

@ -77,7 +77,8 @@ int32_t create_stream() {
} }
taos_free_result(pRes); taos_free_result(pRes);
const char* sql = "select ts,sum(k) from tu1"; const char* sql = "select sum(k) from tu1";
/*const char* sql = "select sum(k) from tu1 interval(10m)";*/
pRes = tmq_create_stream(pConn, "stream1", "out1", sql); pRes = tmq_create_stream(pConn, "stream1", "out1", sql);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes)); printf("failed to create stream out1, reason:%s\n", taos_errstr(pRes));

View File

@ -2345,6 +2345,8 @@ static FORCE_INLINE SStreamTask* streamTaskNew(int64_t streamId, int32_t level)
return NULL; return NULL;
} }
pTask->taskId = tGenIdPI32(); pTask->taskId = tGenIdPI32();
pTask->streamId = streamId;
pTask->level = level;
pTask->status = STREAM_TASK_STATUS__RUNNING; pTask->status = STREAM_TASK_STATUS__RUNNING;
pTask->qmsg = NULL; pTask->qmsg = NULL;
return pTask; return pTask;

View File

@ -2823,7 +2823,8 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1; if (tEncodeCStr(&encoder, pReq->outputTbName) < 0) return -1;
if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1; if (tEncodeI8(&encoder, pReq->igExists) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (tEncodeI32(&encoder, sqlLen) < 0) return -1;
if (tEncodeI32(&encoder, astLen) < 0) return -1;
if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1; if (sqlLen > 0 && tEncodeCStr(&encoder, pReq->sql) < 0) return -1;
if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1; if (astLen > 0 && tEncodeCStr(&encoder, pReq->ast) < 0) return -1;

View File

@ -142,6 +142,8 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_SUBSCRIBE, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_MQ_COMMIT_OFFSET, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0); dndSetMsgHandle(pWrapper, TDMT_MND_GET_SUB_EP, (NodeMsgFp)mmProcessReadMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_MND_CREATE_STREAM, (NodeMsgFp)mmProcessWriteMsg, 0);
dndSetMsgHandle(pWrapper, TDMT_VND_TASK_DEPLOY_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);
// Requests handled by VNODE // Requests handled by VNODE
dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0); dndSetMsgHandle(pWrapper, TDMT_VND_MQ_SET_CONN_RSP, (NodeMsgFp)mmProcessWriteMsg, 0);

View File

@ -34,20 +34,21 @@
extern bool tsStreamSchedV; extern bool tsStreamSchedV;
int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type) { int32_t mndPersistTaskDeployReq(STrans* pTrans, SStreamTask* pTask, const SEpSet* pEpSet, tmsg_t type, int32_t nodeId) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
int32_t tlen = sizeof(SMsgHead) + encoder.pos; int32_t size = encoder.pos;
int32_t tlen = sizeof(SMsgHead) + size;
tCoderClear(&encoder); tCoderClear(&encoder);
void* buf = malloc(tlen); void* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
((SMsgHead*)buf)->streamTaskId = pTask->taskId; ((SMsgHead*)buf)->streamTaskId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, tlen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, abuf, size, TD_ENCODER);
tEncodeSStreamTask(&encoder, pTask); tEncodeSStreamTask(&encoder, pTask);
tCoderClear(&encoder); tCoderClear(&encoder);
@ -72,7 +73,7 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask, SS
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_VND_TASK_DEPLOY, pVgroup->vgId);
return 0; return 0;
} }
@ -92,7 +93,7 @@ int32_t mndAssignTaskToSnode(SMnode* pMnode, STrans* pTrans, SStreamTask* pTask,
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY); mndPersistTaskDeployReq(pTrans, pTask, &plan->execNode.epSet, TDMT_SND_TASK_DEPLOY, 0);
return 0; return 0;
} }
@ -118,7 +119,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SSubplan* plan = nodesListGetNode(inner->pNodeList, level); SSubplan* plan = nodesListGetNode(inner->pNodeList, level);
if (level == 0) { if (level == 0) {
ASSERT(plan->type == SUBPLAN_TYPE_SCAN); ASSERT(plan->subplanType == SUBPLAN_TYPE_SCAN);
void* pIter = NULL; void* pIter = NULL;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup);
@ -148,7 +149,7 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) {
SStreamTask* pTask = streamTaskNew(pStream->uid, level); SStreamTask* pTask = streamTaskNew(pStream->uid, level);
pTask->pipeSource = 0; pTask->pipeSource = 0;
pTask->pipeSink = level == totLevel - 1 ? 1 : 0; pTask->pipeSink = level == totLevel - 1 ? 1 : 0;
pTask->parallelizable = plan->type == SUBPLAN_TYPE_SCAN; pTask->parallelizable = plan->subplanType == SUBPLAN_TYPE_SCAN;
pTask->nextOpDst = STREAM_NEXT_OP_DST__VND; pTask->nextOpDst = STREAM_NEXT_OP_DST__VND;
if (tsStreamSchedV) { if (tsStreamSchedV) {

View File

@ -21,6 +21,7 @@
#include "mndScheduler.h" #include "mndScheduler.h"
#include "mndShow.h" #include "mndShow.h"
#include "mndStb.h" #include "mndStb.h"
#include "mndTopic.h"
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
@ -33,6 +34,7 @@ static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq); static int32_t mndProcessCreateStreamReq(SNodeMsg *pReq);
static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp);
/*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/ /*static int32_t mndProcessDropStreamReq(SNodeMsg *pReq);*/
/*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/ /*static int32_t mndProcessDropStreamInRsp(SNodeMsg *pRsp);*/
static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq); static int32_t mndProcessStreamMetaReq(SNodeMsg *pReq);
@ -50,6 +52,8 @@ int32_t mndInitStream(SMnode *pMnode) {
.deleteFp = (SdbDeleteFp)mndStreamActionDelete}; .deleteFp = (SdbDeleteFp)mndStreamActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_VND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
mndSetMsgHandle(pMnode, TDMT_SND_TASK_DEPLOY_RSP, mndProcessTaskDeployInternalRsp);
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);*/
/*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM_RSP, mndProcessDropStreamInRsp);*/
@ -68,7 +72,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
SCoder encoder; SCoder encoder;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, NULL, 0, TD_ENCODER);
if (tEncodeSStreamObj(NULL, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tCoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
@ -83,7 +87,7 @@ SSdbRaw *mndStreamActionEncode(SStreamObj *pStream) {
if (buf == NULL) goto STREAM_ENCODE_OVER; if (buf == NULL) goto STREAM_ENCODE_OVER;
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER); tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, tlen, TD_ENCODER);
if (tEncodeSStreamObj(NULL, pStream) < 0) { if (tEncodeSStreamObj(&encoder, pStream) < 0) {
tCoderClear(&encoder); tCoderClear(&encoder);
goto STREAM_ENCODE_OVER; goto STREAM_ENCODE_OVER;
} }
@ -135,7 +139,7 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER); SDB_GET_BINARY(pRaw, dataPos, buf, tlen, STREAM_DECODE_OVER);
SCoder decoder; SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, NULL, 0, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, tlen + 1, TD_DECODER);
if (tDecodeSStreamObj(&decoder, pStream) < 0) { if (tDecodeSStreamObj(&decoder, pStream) < 0) {
goto STREAM_DECODE_OVER; goto STREAM_DECODE_OVER;
} }
@ -191,6 +195,11 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
} }
static int32_t mndProcessTaskDeployInternalRsp(SNodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) { static SDbObj *mndAcquireDbByStream(SMnode *pMnode, char *streamName) {
SName name = {0}; SName name = {0};
tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); tNameFromString(&name, streamName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
@ -209,6 +218,33 @@ static int32_t mndCheckCreateStreamReq(SCMCreateStreamReq *pCreate) {
return 0; return 0;
} }
static int32_t mndStreamGetPlanString(const SCMCreateStreamReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
int32_t code = nodesStringToNode(pCreate->ast, &pAst);
SQueryPlan *pPlan = NULL;
if (TSDB_CODE_SUCCESS == code) {
SPlanContext cxt = {
.pAstRoot = pAst,
.topicQuery = false,
.streamQuery = true,
};
code = qCreateQueryPlan(&cxt, &pPlan, NULL);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesNodeToString(pPlan, false, pStr, NULL);
}
nodesDestroyNode(pAst);
nodesDestroyNode(pPlan);
terrno = code;
return code;
}
static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) { static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamReq *pCreate, SDbObj *pDb) {
mDebug("stream:%s to create", pCreate->name); mDebug("stream:%s to create", pCreate->name);
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
@ -220,8 +256,13 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe
streamObj.dbUid = pDb->uid; streamObj.dbUid = pDb->uid;
streamObj.version = 1; streamObj.version = 1;
streamObj.sql = pCreate->sql; streamObj.sql = pCreate->sql;
streamObj.physicalPlan = ""; /*streamObj.physicalPlan = "";*/
streamObj.logicalPlan = ""; streamObj.logicalPlan = "not implemented";
if (TSDB_CODE_SUCCESS != mndStreamGetPlanString(pCreate, &streamObj.physicalPlan)) {
mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr());
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_TYPE_CREATE_STREAM, &pReq->rpcMsg);
if (pTrans == NULL) { if (pTrans == NULL) {

View File

@ -236,7 +236,7 @@ static int32_t mndCheckCreateTopicReq(SCMCreateTopicReq *pCreate) {
return 0; return 0;
} }
static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { static int32_t mndGetPlanString(const SCMCreateTopicReq *pCreate, char **pStr) {
if (NULL == pCreate->ast) { if (NULL == pCreate->ast) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -82,12 +82,6 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodePutToVFetchQ(SVnode* pVnode, struct SRpcMsg* pReq);
int32_t vnodeSendReq(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
int32_t vnodeSendMnodeReq(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendRsp(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pRsp);
#define vFatal(...) \ #define vFatal(...) \
do { \ do { \
if (vDebugFlag & DEBUG_FATAL) { \ if (vDebugFlag & DEBUG_FATAL) { \

View File

@ -120,7 +120,7 @@ int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) {
.code = 0, .code = 0,
.msgType = type, .msgType = type,
}; };
/*vnodeSendReq(pTq->pVnode, &pTask->NextOpEp, &msg);*/ tmsgSendReq(&pTq->pVnode->msgCb, &pTask->NextOpEp, &msg);
} }
} }
@ -498,7 +498,9 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
} }
SCoder decoder; SCoder decoder;
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER); tCoderInit(&decoder, TD_LITTLE_ENDIAN, (uint8_t*)msg, msgLen, TD_DECODER);
tDecodeSStreamTask(&decoder, pTask); if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
}
tCoderClear(&decoder); tCoderClear(&decoder);
tqExpandTask(pTq, pTask, 8); tqExpandTask(pTq, pTask, 8);