diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 5ed230c6d9..42e2de13e2 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -312,6 +312,8 @@ TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8 diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index 1d94936873..3236719ef2 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -615,10 +615,11 @@ typedef struct SShowCreateTSMAStmt { }SShowCreateTSMAStmt; typedef struct SDropTSMAStmt { - ENodeType type; - bool ignoreNotExists; - char dbName[TSDB_DB_NAME_LEN]; - char tsmaName[TSDB_INDEX_NAME_LEN]; + ENodeType type; + bool ignoreNotExists; + char dbName[TSDB_DB_NAME_LEN]; + char tsmaName[TSDB_INDEX_NAME_LEN]; + SMDropSmaReq* pReq; } SDropTSMAStmt; #ifdef __cplusplus diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index bd0c54b9ba..c8b00120a1 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -225,6 +225,10 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_STOP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_CREATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 8bd5929502..b47283ba88 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -55,6 +55,20 @@ static int32_t mndProcessDropIdxReq(SRpcMsg *pReq); static int32_t mndRetrieveIdx(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter); +typedef struct SCreateTSMACxt { + SMnode * pMnode; + const SRpcMsg * pRpcReq; + const SMCreateSmaReq *pCreateSmaReq; + const SDbObj * pDb; + const SStbObj * pSrcStb; + // TODO normal table + SSmaObj * pSma; + SCMCreateStreamReq *pCreateStreamReq; + SMDropStreamReq * pDropStreamReq; + const char * streamName; + const char * targetStbFullName; +} SCreateTSMACxt; + int32_t mndInitSma(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_SMA, @@ -382,6 +396,14 @@ static int32_t mndSetCreateSmaRedoLogs(SMnode *pMnode, STrans *pTrans, SSmaObj * return 0; } +static int32_t mndSetCreateSmaUndoLogs(SMnode* pMnode, STrans* pTrans, SSmaObj* pSma) { + SSdbRaw * pUndoRaw = mndSmaActionEncode(pSma); + if (!pUndoRaw) return -1; + if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1; + if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1; + return 0; +} + static int32_t mndSetCreateSmaCommitLogs(SMnode *pMnode, STrans *pTrans, SSmaObj *pSma) { SSdbRaw *pCommitRaw = mndSmaActionEncode(pSma); if (pCommitRaw == NULL) return -1; @@ -694,9 +716,7 @@ static int32_t mndCheckCreateSmaReq(SMCreateSmaReq *pCreate) { static void mndGetStreamNameFromSmaName(char *streamName, char *smaName) { SName n; tNameFromString(&n, smaName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); - streamName[0] = '1'; - streamName[1] = '.'; - strcpy(streamName + 2, tNameGetTableName(&n)); + sprintf(streamName, "%d.%s", n.acctId, n.tname); } static int32_t mndProcessCreateSmaReq(SRpcMsg *pReq) { @@ -1391,31 +1411,116 @@ static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMC pStream->version = 1; pStream->sql = taosStrdup(pCreateReq->sql); pStream->smaId = pSma->uid; - //pStream->conf.watermark = 0; - //pStream->deleteMark = 0; + pStream->conf.watermark = 0; + pStream->deleteMark = 0; pStream->conf.fillHistory = STREAM_FILL_HISTORY_ON; pStream->conf.trigger = STREAM_TRIGGER_WINDOW_CLOSE; pStream->conf.triggerParam = 10000; pStream->ast = taosStrdup(pSma->ast); } -static int32_t mndCreateTSMATxnPrepare(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SMCreateSmaReq *pCreate, - SStreamObj *pStream, SSmaObj *pSma) { - int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "create-tsma"); - if (!pTrans) return TSDB_CODE_OUT_OF_MEMORY; - mndTransSetDbName(pTrans, pDb->name, NULL); - if ((mndTransCheckConflict(pMnode, pTrans)) != 0) goto _OVER; +static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) { + tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN); + tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN); + tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); + pCxt->pCreateStreamReq->igExists = false; + pCxt->pCreateStreamReq->triggerType = STREAM_TRIGGER_MAX_DELAY; + pCxt->pCreateStreamReq->igExpired = false; + pCxt->pCreateStreamReq->fillHistory = STREAM_FILL_HISTORY_ON; + pCxt->pCreateStreamReq->maxDelay = 10000; + pCxt->pCreateStreamReq->watermark = 0; + pCxt->pCreateStreamReq->numOfTags = pCxt->pSrcStb->numOfTags; + pCxt->pCreateStreamReq->checkpointFreq = 0; + pCxt->pCreateStreamReq->createStb = 1; + pCxt->pCreateStreamReq->targetStbUid = 0; + pCxt->pCreateStreamReq->fillNullCols = NULL; + pCxt->pCreateStreamReq->igUpdate = 0; + // TODO what's this tiemstamp? + pCxt->pCreateStreamReq->lastTs = 1685959190000; + pCxt->pCreateStreamReq->ast = strdup(pCxt->pCreateSmaReq->ast); + pCxt->pCreateStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); - mndTransSetSerial(pTrans); - mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCreate->name, pStream->name); + // construct tags + pCxt->pCreateStreamReq->pTags = taosArrayInit(pCxt->pSrcStb->numOfTags, sizeof(SField)); + for (int32_t idx = 0; idx < pCxt->pSrcStb->numOfTags; ++idx) { + SField f = {0}; + SSchema *pSchema = &pCxt->pSrcStb->pTags[idx]; + f.bytes = pSchema->bytes; + f.type = pSchema->type; + f.flags = pSchema->flags; + tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN); + taosArrayPush(pCxt->pCreateStreamReq->pTags, &f); + } - if (mndSetCreateSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER; - if (mndSetCreateSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER; + tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN); + pCxt->pDropStreamReq->igNotExists = false; + // TODO fill sql + pCxt->pDropStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); + pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql); +} + +static int32_t mndCreateTSMASetCreateStreamRedoAction(SMnode* pMnode) { + return TSDB_CODE_SUCCESS; +} + +static int32_t mndCreateTSMASetCreateStreamUndoAction(SMnode* pMnode) { + return TSDB_CODE_SUCCESS; +} + +static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { + int32_t code = -1; + STransAction redoAction = {0}; + STransAction undoAction = {0}; + // TODO trans conflicting setting + STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma"); + if (!pTrans) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + mndTransSetDbName(pTrans, pCxt->pDb->name, NULL); + if ((mndTransCheckConflict(pCxt->pMnode, pTrans)) != 0) goto _OVER; + + mndTransSetParallel(pTrans); + mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name, + pCxt->pCreateStreamReq->name); + + mndGetMnodeEpSet(pCxt->pMnode, &redoAction.epSet); + redoAction.acceptableCode = TSDB_CODE_MND_STREAM_ALREADY_EXIST; + redoAction.msgType = TDMT_STREAM_CREATE; + redoAction.contLen = tSerializeSCMCreateStreamReq(0, 0, pCxt->pCreateStreamReq); + redoAction.pCont = taosMemoryCalloc(1, redoAction.contLen); + if (!redoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (redoAction.contLen != tSerializeSCMCreateStreamReq(redoAction.pCont, redoAction.contLen, pCxt->pCreateStreamReq)) { + mError("sma: %s, failed to create due to create stream req encode failure", pCxt->pCreateSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + undoAction.epSet = redoAction.epSet; + undoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST; + undoAction.actionType = TDMT_STREAM_DROP; + undoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq); + undoAction.pCont = taosMemoryCalloc(1, undoAction.contLen); + if (!undoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (undoAction.contLen != tSerializeSMDropStreamReq(undoAction.pCont, undoAction.contLen, pCxt->pDropStreamReq)) { + mError("sma: %s, failed to create due to drop stream req encode failure", pCxt->pCreateSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + if (mndSetCreateSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; + if (mndSetCreateSmaUndoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; + if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; + if (mndTransAppendRedoAction(pTrans, &redoAction) != 0) goto _OVER; + if (mndTransAppendUndoAction(pTrans, &undoAction) != 0) goto _OVER; + if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER; - if (mndScheduleStream(pMnode, pStream, 1685959190000) != 0) goto _OVER; - if (mndPersistStream(pMnode, pTrans, pStream) != 0) goto _OVER; - if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; code = TSDB_CODE_SUCCESS; _OVER: @@ -1423,60 +1528,29 @@ _OVER: return code; } -static int32_t mndCreateTSMA(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCreateReq, SDbObj *pDb, SStbObj *pStb, - const char *streamName) { - SSmaObj pSma = {0}; - SStreamObj pStream = {0}; - initSMAObj(&pSma, pCreateReq, pStb, pDb); - initStreamObj(&pStream, streamName, pCreateReq, pDb, &pSma); +static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) { + int32_t code; + SSmaObj sma = {0}; + SCMCreateStreamReq createStreamReq = {0}; + SMDropStreamReq dropStreamReq = {0}; - SNode* pAst = NULL; - if (nodesStringToNode(pCreateReq->ast, &pAst) < 0) { - terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; - mError("tsma:%s, failed to create since parse ast error", pSma.name); - return -1; - } + pCxt->pSma = &sma; + initSMAObj(&sma, pCxt->pCreateSmaReq, pCxt->pSrcStb, pCxt->pDb); + pCxt->pCreateStreamReq = &createStreamReq; + pCxt->pDropStreamReq = &dropStreamReq; + mndCreateTSMABuildStreamOpReq(pCxt); - if (qExtractResultSchema(pAst, (int32_t *)&pStream.outputSchema.nCols, &pStream.outputSchema.pSchema) != 0) { - terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; - mError("sma:%s, failed to create since extract result schema error", pSma.name); - return -1; - } - - SQueryPlan* pPlan = NULL; - SPlanContext cxt = { - .pAstRoot = pAst, - .topicQuery = false, - .streamQuery = true, - .triggerType = pStream.conf.trigger, - .watermark = pStream.conf.watermark, - .deleteMark = pStream.deleteMark, - }; - - if (qCreateQueryPlan(&cxt, &pPlan, NULL) < 0) { - terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; - mError("sma:%s, failed to create since create query plan error", pSma.name); - return -1; - } - - if (nodesNodeToString((SNode *)pPlan, false, &pStream.physicalPlan, NULL) != 0) { - terrno = TSDB_CODE_MND_INVALID_SMA_OPTION; - mError("sma:%s, failed to create since save physcial plan error", pSma.name); - return -1; - } - - if (pAst) nodesDestroyNode(pAst); - nodesDestroyNode((SNode*)pPlan); - - int32_t code; - if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pMnode, pReq, pDb, pCreateReq, &pStream, &pSma)) { + if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pCxt)) { code = -1; + goto _OVER; } else { - mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", pCreateReq->name, - pSma.uid, pSma.stbUid, pSma.dstTbUid, pSma.dstTbName, pSma.dstVgId); + mInfo("sma:%s, uid:%" PRIi64 " create on stb:%" PRIi64 ", dstSuid:%" PRIi64 " dstTb:%s dstVg:%d", + pCxt->pCreateSmaReq->name, sma.uid, sma.stbUid, sma.dstTbUid, sma.dstTbName, sma.dstVgId); code = 0; } +_OVER: + pCxt->pCreateStreamReq = NULL; return code; } @@ -1486,7 +1560,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { goto _OVER; #endif SMnode * pMnode = pReq->info.node; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = -1; SDbObj * pDb = NULL; SStbObj * pStb = NULL; SSmaObj * pSma = NULL; @@ -1510,7 +1584,18 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { } char streamName[TSDB_TABLE_FNAME_LEN] = {0}; - mndGetStreamNameFromSmaName(streamName, createReq.name); + char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0}; + SName smaName; + tNameFromString(&smaName, createReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); + snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", createReq.name); + + SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName); + if (pTargetStb) { + mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name, + streamTargetStbFullName); + goto _OVER; + } pStream = mndAcquireStream(pMnode, streamName); if (pStream != NULL) { @@ -1540,7 +1625,19 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { goto _OVER; } - code = mndCreateTSMA(pMnode, pReq, &createReq, pDb, pStb, streamName); + SCreateTSMACxt cxt = { + .pMnode = pMnode, + .pCreateSmaReq = &createReq, + .pCreateStreamReq = NULL, + .streamName = streamName, + .targetStbFullName = streamTargetStbFullName, + .pDb = pDb, + .pRpcReq = pReq, + .pSma = NULL, + .pSrcStb = pStb, + }; + + code = mndCreateTSMA(&cxt); if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: @@ -1554,10 +1651,42 @@ _OVER: mndReleaseDb(pMnode, pDb); tFreeSMCreateSmaReq(&createReq); - return TSDB_CODE_MND_SMA_ALREADY_EXIST; + return code; } static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = -1; + SMDropSmaReq dropReq = {0}; + SSmaObj * pSma; + SDbObj * pDb; + SMnode * pMnode = pReq->info.node; + if (tDeserializeSMDropSmaReq(pReq->pCont, pReq->contLen, &dropReq) != TSDB_CODE_SUCCESS) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + pSma = mndAcquireSma(pMnode, dropReq.name); + if (!pSma && dropReq.igNotExists) { + code = 0; + goto _OVER; + } + if (!pSma) { + terrno = TSDB_CODE_MND_SMA_NOT_EXIST; + goto _OVER; + } + pDb = mndAcquireDbBySma(pMnode, dropReq.name); + if (!pDb) { + terrno = TSDB_CODE_MND_DB_NOT_EXIST; + goto _OVER; + } + + if (mndCheckDbPrivilege(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pDb) != 0) { + goto _OVER; + } + + code = mndDropSma(pMnode, pReq, pDb, pSma); + + code = TSDB_CODE_SUCCESS; +_OVER: return code; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 566e1a28c3..0f826cd85f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -41,6 +41,10 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); + +static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq); +static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq); + static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq); static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq); static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq); @@ -102,7 +106,13 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_UPDATE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_VND_STREAM_TASK_RESET_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); + // for msgs inside mnode + mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE, mndProcessCreateStreamReqFromMNode); + mndSetMsgHandle(pMnode, TDMT_STREAM_CREATE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode); + mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp); + + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); @@ -2338,3 +2348,25 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { return 0; } + +static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { + int32_t code = mndProcessCreateStreamReq(pReq); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + pReq->info.rsp = rpcMallocCont(1); + pReq->info.rspLen = 1; + pReq->info.noResp = false; + pReq->code = code; + } + return code; +} + +static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { + int32_t code = mndProcessDropStreamReq(pReq); + if (code != 0) { + pReq->info.rsp = rpcMallocCont(1); + pReq->info.rspLen = 1; + pReq->info.noResp = false; + pReq->code = code; + } + return code; +} diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y old mode 100755 new mode 100644 diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 07e1a27218..9a2380698d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7434,6 +7434,7 @@ typedef struct SSampleAstInfo { SNodeList* pPartitionByList; STableMeta* pRollupTableMeta; bool createSmaIndex; + SNodeList* pTags; } SSampleAstInfo; static int32_t buildTableForSampleAst(SSampleAstInfo* pInfo, SNode** pOutput) { @@ -7530,6 +7531,7 @@ static int32_t buildSampleAst(STranslateContext* pCxt, SSampleAstInfo* pInfo, ch code = buildProjectsForSampleAst(pInfo, &pSelect->pProjectionList); } if (TSDB_CODE_SUCCESS == code) { + TSWAP(pInfo->pTags, pSelect->pTags); TSWAP(pSelect->pPartitionByList, pInfo->pPartitionByList); code = buildIntervalForSampleAst(pInfo, &pSelect->pWindow); } @@ -10353,8 +10355,17 @@ static int32_t translateShowCreateView(STranslateContext* pCxt, SShowCreateViewS #endif } -static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, char** pAst, int32_t* pLen, char** pExpr, - int32_t* pExprLen) { +SNode* createColumnNodeWithName(const char* name) { + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (!pCol) return NULL; + tstrncpy(pCol->colName, name, TSDB_COL_NAME_LEN); + tstrncpy(pCol->node.aliasName, name, TSDB_COL_NAME_LEN); + tstrncpy(pCol->node.userAlias, name, TSDB_COL_NAME_LEN); + return (SNode*)pCol; +} + +static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, SMCreateSmaReq* pReq, + STableMeta* pTableMeta) { int32_t code = TSDB_CODE_SUCCESS; SSampleAstInfo info = {0}; info.createSmaIndex = true; @@ -10363,8 +10374,32 @@ static int32_t buildTSMAAst(STranslateContext* pCxt, SCreateTSMAStmt* pStmt, cha info.pFuncs = nodesCloneList(pStmt->pOptions->pFuncs); info.pInterval = nodesCloneNode(pStmt->pOptions->pInterval); if (!info.pFuncs || !info.pInterval) code = TSDB_CODE_OUT_OF_MEMORY; + + if (TSDB_CODE_SUCCESS == code) { + // append partition by tbname + SNode* pTbnameFunc = createTbnameFunction(); + if (pTbnameFunc) { + nodesListMakeAppend(&info.pPartitionByList, pTbnameFunc); + } else { + code = TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS == code) { + // append partition by tags + int32_t numOfCols = pTableMeta->tableInfo.numOfColumns, numOfTags = pTableMeta->tableInfo.numOfTags; + for (int32_t idx = numOfCols; idx < numOfCols + numOfTags; ++idx) { + SNode* pTagCol = createColumnNodeWithName(pTableMeta->schema[idx].name); + if (!pTagCol) { + code = TSDB_CODE_OUT_OF_MEMORY; + break; + } + nodesListAppend(info.pPartitionByList, pTagCol); + nodesListMakeAppend(&info.pTags, nodesCloneNode(pTagCol)); + } + } + if (code == TSDB_CODE_SUCCESS) { - code = buildSampleAst(pCxt, &info, pAst, pLen, pExpr, pExprLen); + code = buildSampleAst(pCxt, &info, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); } clearSampleAstInfo(&info); return code; @@ -10454,7 +10489,7 @@ static int32_t buildCreateTSMAReq(STranslateContext* pCxt, SCreateTSMAStmt* pStm code = translateTSMAFuncs(pCxt, pStmt, pTableMeta); } if (TSDB_CODE_SUCCESS == code) { - code = buildTSMAAst(pCxt, pStmt, &pReq->ast, &pReq->astLen, &pReq->expr, &pReq->exprLen); + code = buildTSMAAst(pCxt, pStmt, pReq, pTableMeta); } /* if (TSDB_CODE_SUCCESS == code) { @@ -10489,6 +10524,20 @@ static int32_t translateCreateTSMA(STranslateContext* pCxt, SCreateTSMAStmt* pSt return code; } +static int32_t translateDropTSMA(STranslateContext* pCxt, SDropTSMAStmt* pStmt) { + int32_t code = TSDB_CODE_SUCCESS; + pStmt->pReq = taosMemoryCalloc(1, sizeof(SMDropSmaReq)); + if (!pStmt->pReq) code = TSDB_CODE_OUT_OF_MEMORY; + if (code == TSDB_CODE_SUCCESS) { + SName name; + tNameExtractFullName(toName(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->tsmaName, &name), pStmt->pReq->name); + pStmt->pReq->igNotExists = pStmt->ignoreNotExists; + } + if (TSDB_CODE_SUCCESS == code) + code = buildCmdMsg(pCxt, TDMT_MND_DROP_TSMA, (FSerializeFunc)tSerializeSMDropSmaReq, pStmt->pReq); + return code; +} + static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { int32_t code = TSDB_CODE_SUCCESS; switch (nodeType(pNode)) {