From a3b4de6631a33cf8df4cbc4b6106aeb9fe1f7835 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 28 Mar 2022 14:40:22 +0800 Subject: [PATCH] stream support sma --- include/libs/stream/tstream.h | 9 +++++++-- source/client/src/tmq.c | 2 +- source/dnode/mnode/impl/inc/mndScheduler.h | 2 +- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndScheduler.c | 6 +++++- source/dnode/mnode/impl/src/mndSma.c | 7 ++++--- source/dnode/mnode/impl/src/mndStream.c | 6 +++--- source/dnode/vnode/src/inc/vnd.h | 3 +++ source/dnode/vnode/src/tq/tq.c | 1 + source/libs/stream/src/tstream.c | 5 +++-- 10 files changed, 29 insertions(+), 14 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 4efddde935..8be9bbbebd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -82,8 +82,12 @@ typedef struct { SHashObj* pHash; // groupId to tbuid } STaskSinkTb; +typedef void FSmaHandle(void* vnode, int64_t smaId, const SArray* data); + typedef struct { - int8_t reserved; + int64_t smaId; + // following are not applicable to encoder and decoder + FSmaHandle* smaHandle; } STaskSinkSma; typedef struct { @@ -156,7 +160,8 @@ typedef struct { STaskDispatcherShuffle shuffleDispatcher; }; - // state storage + // application storage + void* ahandle; } SStreamTask; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 8eaca6853d..5d00cca76e 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -877,7 +877,7 @@ WRITE_QUEUE_FAIL: } bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) { - printf("call update ep %d\n", epoch); + /*printf("call update ep %d\n", epoch);*/ bool set = false; int32_t sz = taosArrayGetSize(pRsp->topics); SArray* newTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); diff --git a/source/dnode/mnode/impl/inc/mndScheduler.h b/source/dnode/mnode/impl/inc/mndScheduler.h index 42951beca2..416061bf34 100644 --- a/source/dnode/mnode/impl/inc/mndScheduler.h +++ b/source/dnode/mnode/impl/inc/mndScheduler.h @@ -27,7 +27,7 @@ void mndCleanupScheduler(SMnode* pMnode); int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscribeObj* pSub); -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream); +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b5d22cb7a5..e7cdd34a7e 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -31,7 +31,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream); SSdbRaw *mndStreamActionEncode(SStreamObj *pStream); SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw); -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans); +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 69ee1a5696..697811cd04 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -119,7 +119,7 @@ SVgObj* mndSchedFetchOneVg(SMnode* pMnode, int64_t dbUid) { return pVgroup; } -int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { +int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream, int64_t smaId) { SSdb* pSdb = pMnode->pSdb; SQueryPlan* pPlan = qStringToQueryPlan(pStream->physicalPlan); if (pPlan == NULL) { @@ -164,6 +164,10 @@ int32_t mndScheduleStream(SMnode* pMnode, STrans* pTrans, SStreamObj* pStream) { // only for inplace pTask->sinkType = TASK_SINK__SHOW; pTask->showSink.reserved = 0; + if (smaId != -1) { + pTask->sinkType = TASK_SINK__SMA; + pTask->smaSink.smaId = smaId; + } } else { pTask->sinkType = TASK_SINK__NONE; } diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 146975aa38..5c62cfa0f2 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -69,7 +69,8 @@ void mndCleanupSma(SMnode *pMnode) {} static SSdbRaw *mndSmaActionEncode(SSmaObj *pSma) { terrno = TSDB_CODE_OUT_OF_MEMORY; - int32_t size = sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; + int32_t size = + sizeof(SSmaObj) + pSma->exprLen + pSma->tagsFilterLen + pSma->sqlLen + pSma->astLen + TSDB_SMA_RESERVE_SIZE; SSdbRaw *pRaw = sdbAllocRaw(SDB_SMA, TSDB_SMA_VER_NUMBER, size); if (pRaw == NULL) goto _OVER; @@ -427,7 +428,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SNodeMsg *pReq, SMCreateSmaReq *pCre if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaRedoActions(pMnode, pTrans, pDb, &smaObj) != 0) goto _OVER; - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) goto _OVER; + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, smaObj.uid) != 0) goto _OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = 0; @@ -491,7 +492,7 @@ static int32_t mndProcessMCreateSmaReq(SNodeMsg *pReq) { mError("sma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); goto _OVER; } - + pStream = mndAcquireStream(pMnode, createReq.name); if (pStream != NULL) { mError("sma:%s, failed to create since stream:%s already exist", createReq.name, createReq.name); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index c02fec0a5f..bbb2f64282 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -246,7 +246,7 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) { return code; } -int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans) { +int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast, STrans *pTrans, int64_t smaId) { SNode *pAst = NULL; if (nodesStringToNode(ast, &pAst) < 0) { @@ -271,7 +271,7 @@ int32_t mndAddStreamToTrans(SMnode *pMnode, SStreamObj *pStream, const char *ast return -1; } - if (mndScheduleStream(pMnode, pTrans, pStream) < 0) { + if (mndScheduleStream(pMnode, pTrans, pStream, smaId) < 0) { mError("stream:%ld, schedule stream since %s", pStream->uid, terrstr()); return -1; } @@ -310,7 +310,7 @@ static int32_t mndCreateStream(SMnode *pMnode, SNodeMsg *pReq, SCMCreateStreamRe } mDebug("trans:%d, used to create stream:%s", pTrans->id, pCreate->name); - if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans) != 0) { + if (mndAddStreamToTrans(pMnode, &streamObj, pCreate->ast, pTrans, -1) != 0) { mError("trans:%d, failed to add stream since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 3309686bb7..7b0606512c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -202,6 +202,9 @@ int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen); +// sma +void smaHandleRes(SVnode* pVnode, int64_t smaId, const SArray* data); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c354b01501..55202335e0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -476,6 +476,7 @@ int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) { if (tqExpandTask(pTq, pTask, 4) < 0) { ASSERT(0); } + pTask->ahandle = pTq->pVnode; taosHashPut(pTq->pStreamTasks, &pTask->taskId, sizeof(int32_t), pTask, sizeof(SStreamTask)); diff --git a/source/libs/stream/src/tstream.c b/source/libs/stream/src/tstream.c index 6d69a8960e..028e310a25 100644 --- a/source/libs/stream/src/tstream.c +++ b/source/libs/stream/src/tstream.c @@ -72,6 +72,7 @@ int32_t streamExecTask(SStreamTask* pTask, SMsgCb* pMsgCb, const void* input, in if (pTask->sinkType == TASK_SINK__TABLE) { // } else if (pTask->sinkType == TASK_SINK__SMA) { + pTask->smaSink.smaHandle(pTask->ahandle, pTask->smaSink.smaId, pRes); // } else if (pTask->sinkType == TASK_SINK__FETCH) { // @@ -208,7 +209,7 @@ int32_t tEncodeSStreamTask(SCoder* pEncoder, const SStreamTask* pTask) { if (pTask->sinkType == TASK_SINK__TABLE) { if (tEncodeI8(pEncoder, pTask->tbSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tEncodeI8(pEncoder, pTask->smaSink.reserved) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SHOW) { @@ -254,7 +255,7 @@ int32_t tDecodeSStreamTask(SCoder* pDecoder, SStreamTask* pTask) { if (pTask->sinkType == TASK_SINK__TABLE) { if (tDecodeI8(pDecoder, &pTask->tbSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SMA) { - if (tDecodeI8(pDecoder, &pTask->smaSink.reserved) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; } else if (pTask->sinkType == TASK_SINK__SHOW) {