diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b9f31b8f1f..5ed230c6d9 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -223,6 +223,8 @@ TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB, "s3migrate-db", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_S3MIGRATE_DB_TIMER, "s3migrate-db-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 86b58c3fa7..bd0c54b9ba 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -164,6 +164,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_PAUSE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RESUME_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 05189d5a53..8bd5929502 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -47,6 +47,9 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq); static int32_t mndRetrieveSma(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndDestroySmaObj(SSmaObj *pSmaObj); +static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq); +static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq); + // sma and tag index comm func static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -72,6 +75,10 @@ int32_t mndInitSma(SMnode *pMnode) { mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndRetrieveIdx); mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_INDEX, mndCancelRetrieveIdx); + + mndSetMsgHandle(pMnode, TDMT_MND_CREATE_TSMA, mndProcessCreateTSMAReq); + mndSetMsgHandle(pMnode, TDMT_MND_DROP_TSMA, mndProcessDropTSMAReq); + return sdbSetTable(pMnode->pSdb, table); } @@ -1345,3 +1352,212 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter) { } taosMemoryFree(p); } + +static void initSMAObj(SSmaObj *pSma, const SMCreateSmaReq *pCreateReq, const SStbObj *pStb, const SDbObj *pDb) { + memcpy(pSma->name, pCreateReq->name, TSDB_TABLE_FNAME_LEN); + memcpy(pSma->stb, pStb->name, TSDB_TABLE_FNAME_LEN); + memcpy(pSma->db, pDb->name, TSDB_DB_FNAME_LEN); + pSma->createdTime = taosGetTimestampMs(); + pSma->uid = mndGenerateUid(pCreateReq->name, TSDB_TABLE_FNAME_LEN); + + char resultTbName[TSDB_TABLE_FNAME_LEN + 16] = {0}; + snprintf(resultTbName, TSDB_TABLE_FNAME_LEN + 16, "%s_td_tsma_rst_tb", pCreateReq->name); + memcpy(pSma->dstTbName, resultTbName, TSDB_TABLE_FNAME_LEN); + pSma->dstTbUid = mndGenerateUid(pSma->dstTbName, TSDB_TABLE_FNAME_LEN); + pSma->stbUid = pStb->uid; + pSma->dbUid = pDb->uid; + pSma->interval = pCreateReq->interval; + pSma->intervalUnit = pCreateReq->intervalUnit; + pSma->timezone = tsTimezone; + + pSma->exprLen = pCreateReq->exprLen; + pSma->sqlLen = pCreateReq->sqlLen; + pSma->astLen = pCreateReq->astLen; + pSma->expr = pCreateReq->expr; + pSma->sql = pCreateReq->sql; + pSma->ast = pCreateReq->ast; +} + +static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMCreateSmaReq *pCreateReq, + const SDbObj *pDb, SSmaObj *pSma) { + tstrncpy(pStream->name, streamName, TSDB_STREAM_FNAME_LEN); + tstrncpy(pStream->sourceDb, pDb->name, TSDB_DB_FNAME_LEN); + tstrncpy(pStream->targetDb, pDb->name, TSDB_DB_FNAME_LEN); + pStream->createTime = taosGetTimestampMs(); + pStream->updateTime = pStream->createTime; + pStream->uid = mndGenerateUid(streamName, strlen(streamName)); + pStream->sourceDbUid = pDb->uid; + pStream->targetDbUid = pDb->uid; + pStream->version = 1; + pStream->sql = taosStrdup(pCreateReq->sql); + pStream->smaId = pSma->uid; + //pStream->conf.watermark = 0; + //pStream->deleteMark = 0; + pStream->conf.fillHistory = STREAM_FILL_HISTORY_ON; + pStream->conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE; + pStream->conf.triggerParam = 10000; + pStream->ast = taosStrdup(pSma->ast); +} + +static int32_t mndCreateTSMATxnPrepare(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SMCreateSmaReq *pCreate, + SStreamObj *pStream, SSmaObj *pSma) { + int32_t code = -1; + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-tsma"); + if (!pTrans) return TSDB_CODE_OUT_OF_MEMORY; + mndTransSetDbName(pTrans, pDb->name, NULL); + if ((mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER; + + mndTransSetSerial(pTrans); + mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCreate->name, pStream->name); + + if (mndSetCreateSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + if (mndSetCreateSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + + if (mndScheduleStream(pMnode, pStream, 1685959190000) != 0) goto _OVER; + if (mndPersistStream(pMnode, pTrans, pStream) != 0) goto _OVER; + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = TSDB_CODE_SUCCESS; + +_OVER: + mndTransDrop(pTrans); + return code; +} + +static int32_t mndCreateTSMA(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreateReq, SDbObj *pDb, SStbObj *pStb, + const char *streamName) { + SSmaObj pSma = {0}; + SStreamObj pStream = {0}; + initSMAObj(&pSma, pCreateReq, pStb, pDb); + initStreamObj(&pStream, streamName, pCreateReq, pDb, &pSma); + + SNode* pAst = NULL; + if (nodesStringToNode(pCreateReq->ast, &pAst) < 0) { + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + mError("tsma:%s, failed to create since parse ast error", pSma.name); + return -1; + } + + if (qExtractResultSchema(pAst, (int32_t *)&pStream.outputSchema.nCols, &pStream.outputSchema.pSchema) != 0) { + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + mError("sma:%s, failed to create since extract result schema error", pSma.name); + return -1; + } + + SQueryPlan* pPlan = NULL; + SPlanContext cxt = { + .pAstRoot = pAst, + .topicQuery = false, + .streamQuery = true, + .triggerType = pStream.conf.trigger, + .watermark = pStream.conf.watermark, + .deleteMark = pStream.deleteMark, + }; + + if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + mError("sma:%s, failed to create since create query plan error", pSma.name); + return -1; + } + + if (nodesNodeToString((SNode *)pPlan, false, &pStream.physicalPlan, NULL) != 0) { + terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; + mError("sma:%s, failed to create since save physcial plan error", pSma.name); + return -1; + } + + if (pAst) nodesDestroyNode(pAst); + nodesDestroyNode((SNode*)pPlan); + + int32_t code; + if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pMnode, pReq, pDb, pCreateReq, &pStream, &pSma)) { + code = -1; + } else { + mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreateReq->name, + pSma.uid, pSma.stbUid, pSma.dstTbUid, pSma.dstTbName, pSma.dstVgId); + code = 0; + } + + return code; +} + +static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { +#ifdef WINDOWS + terrno = TSDB_CODE_MND_INVALID_PLATFORM; + goto _OVER; +#endif + SMnode * pMnode = pReq->info.node; + int32_t code = TSDB_CODE_SUCCESS; + SDbObj * pDb = NULL; + SStbObj * pStb = NULL; + SSmaObj * pSma = NULL; + SStreamObj * pStream = NULL; + int64_t mTraceId = TRACE_GET_ROOTID(&pReq->info.traceId); + SMCreateSmaReq createReq = {0}; + + if (tDeserializeSMCreateSmaReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mInfo("start to create tsma: %s", createReq.name); + if (mndCheckCreateSmaReq(&createReq)) + goto _OVER; + + pStb = mndAcquireStb(pMnode, createReq.stb); + if (!pStb) { + mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); + goto _OVER; + } + + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + mndGetStreamNameFromSmaName(streamName, createReq.name); + + pStream = mndAcquireStream(pMnode, streamName); + if (pStream != NULL) { + mError("tsma:%s, failed to create since stream:%s already exist", createReq.name, streamName); + terrno = TSDB_CODE_MND_STREAM_ALREADY_EXIST; + goto _OVER; + } + + pSma = sdbAcquire(pMnode->pSdb, SDB_SMA, createReq.name); + if (pSma && createReq.igExists) { + mInfo("tsma:%s, already exists in sma:%s, ignore exist is set", createReq.name, pSma->name); + code = 0; + goto _OVER; + } + if (pSma) { + terrno = TSDB_CODE_MND_SMA_ALREADY_EXIST; + goto _OVER; + } + + pDb = mndAcquireDbBySma(pMnode, createReq.name); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + goto _OVER; + } + + if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + goto _OVER; + } + + code = mndCreateTSMA(pMnode, pReq, &createReq, pDb, pStb, streamName); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("tsma:%s, failed to create since %s", createReq.name, terrstr()); + } + + mndReleaseStb(pMnode, pStb); + mndReleaseSma(pMnode, pSma); + mndReleaseStream(pMnode, pStream); + mndReleaseDb(pMnode, pDb); + tFreeSMCreateSmaReq(&createReq); + + return TSDB_CODE_MND_SMA_ALREADY_EXIST; +} + +static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { + int32_t code = TSDB_CODE_SUCCESS; + return code; +} diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index b7d18c9eb6..da4647af0c 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -2848,7 +2848,7 @@ SNode* createCreateTSMAStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStmt->ignoreExists = ignoreExists; pStmt->pOptions = (STSMAOptions*)pOptions; pStmt->pOptions->pInterval = pInterval; - COPY_STRING_FORM_STR_TOKEN(pStmt->tsmaName, tsmaName); + COPY_STRING_FORM_ID_TOKEN(pStmt->tsmaName, tsmaName); SRealTableNode* pTable = (SRealTableNode*)pRealTable; memcpy(pStmt->dbName, pTable->table.dbName, TSDB_DB_NAME_LEN); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1e3596af63..07e1a27218 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10482,7 +10482,8 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt code = buildCreateTSMAReq(pCxt, pStmt, pStmt->pReq); } if (TSDB_CODE_SUCCESS == code) { - code = buildCmdMsg(pCxt, TDMT_MND_CREATE_SMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq); + // TODO replace with tsma serialization func + code = buildCmdMsg(pCxt, TDMT_MND_CREATE_TSMA, (FSerializeFunc)tSerializeSMCreateSmaReq, pStmt->pReq); } tFreeSMCreateSmaReq(pStmt->pReq); return code; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2e9499b77d..4230abc8b8 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -322,7 +322,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_MANY_STREAMS, "Too many streams") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_TARGET_TABLE, "Cannot write the same stable as other stream") // mnode-sma -TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "index already exists in db") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_ALREADY_EXIST, "SMA already exists in db") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SMA_NOT_EXIST, "index not exist") TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_SMA_OPTION, "Invalid sma index option") diff --git a/tests/system-test/6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py b/tests/system-test/6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py index 51da6fc723..cecce37633 100644 --- a/tests/system-test/6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py +++ b/tests/system-test/6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py @@ -167,6 +167,7 @@ class TDTestCase: def run(self): self.check_setup_cluster_status() + sleep(9999999) self.create_db_check_vgroups() self.create_db_replica_3_insertdatas(self.db_name , self.replica , self.vgroups , self.tb_nums , self.row_nums) self.check_insert_status(self.db_name , self.tb_nums , self.row_nums)