diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 42e2de13e2..5ecd32c083 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -225,6 +225,7 @@ TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG) TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8 diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index c8b00120a1..e1beb9be8e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -167,6 +167,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index b47283ba88..1db07d6322 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -58,7 +58,10 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter); typedef struct SCreateTSMACxt { SMnode * pMnode; const SRpcMsg * pRpcReq; - const SMCreateSmaReq *pCreateSmaReq; + union { + const SMCreateSmaReq *pCreateSmaReq; + const SMDropSmaReq * pDropSmaReq; + }; const SDbObj * pDb; const SStbObj * pSrcStb; // TODO normal table @@ -1419,7 +1422,7 @@ static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMC pStream->ast = taosStrdup(pSma->ast); } -static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) { +static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) { tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN); tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); @@ -1451,11 +1454,13 @@ static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) { tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN); taosArrayPush(pCxt->pCreateStreamReq->pTags, &f); } +} +static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) { tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN); pCxt->pDropStreamReq->igNotExists = false; // TODO fill sql - pCxt->pDropStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql); + pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name); pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql); } @@ -1471,14 +1476,14 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) { int32_t code = -1; STransAction redoAction = {0}; STransAction undoAction = {0}; - // TODO trans conflicting setting + // TODO trans conflicting setting, maybe conflict with myself STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma"); if (!pTrans) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto _OVER; } mndTransSetDbName(pTrans, pCxt->pDb->name, NULL); - if ((mndTransCheckConflict(pCxt->pMnode, pTrans)) != 0) goto _OVER; + if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER; mndTransSetParallel(pTrans); mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name, @@ -1538,7 +1543,8 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) { initSMAObj(&sma, pCxt->pCreateSmaReq, pCxt->pSrcStb, pCxt->pDb); pCxt->pCreateStreamReq = &createStreamReq; pCxt->pDropStreamReq = &dropStreamReq; - mndCreateTSMABuildStreamOpReq(pCxt); + mndCreateTSMABuildCreateStreamReq(pCxt); + mndCreateTSMABuildDropStreamReq(pCxt); if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pCxt)) { code = -1; @@ -1580,6 +1586,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { pStb = mndAcquireStb(pMnode, createReq.stb); if (!pStb) { mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb); + terrno = TSDB_CODE_MND_STB_NOT_EXIST; goto _OVER; } @@ -1592,6 +1599,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) { SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName); if (pTargetStb) { + terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST; mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name, streamTargetStbFullName); goto _OVER; @@ -1654,6 +1662,71 @@ _OVER: return code; } +static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) { + int32_t code = -1; + STransAction dropStreamRedoAction = {0}; + STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "drop-tsma"); + if (!pTrans) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + SMDropStreamReq dropStreamReq = {0}; + pCxt->pDropStreamReq = &dropStreamReq; + mndCreateTSMABuildDropStreamReq(pCxt); + mndTransSetDbName(pTrans, pCxt->pDb->name, NULL); + if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER; + mndTransSetSerial(pTrans); + mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet); + dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST; + dropStreamRedoAction.msgType = TDMT_STREAM_DROP; + dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq); + dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen); + if (!dropStreamRedoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (dropStreamRedoAction.contLen != + tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) { + mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + // output stable is not drop when dropping stream, dropping it when dropping tsma + SMDropStbReq dropStbReq = {0}; + dropStbReq.igNotExists = false; + tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN); + // TODO fill sql + dropStbReq.sql = "drop"; + dropStbReq.sqlLen = 5; + + STransAction dropStbRedoAction = {0}; + mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet); + dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST; + dropStbRedoAction.msgType = TDMT_MND_STB_DROP; + dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq); + dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen); + if (!dropStbRedoAction.pCont) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } + if (dropStbRedoAction.contLen != tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) { + mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name); + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + if (mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; + if (mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER; + if (mndTransAppendRedoAction(pTrans, &dropStreamRedoAction) != 0) goto _OVER; + if (mndTransAppendRedoAction(pTrans, &dropStbRedoAction) != 0) goto _OVER; + if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER; + code = TSDB_CODE_SUCCESS; +_OVER: + mndTransDrop(pTrans); + return code; +} + static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { int32_t code = -1; SMDropSmaReq dropReq = {0}; @@ -1665,6 +1738,15 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { goto _OVER; } + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0}; + SName smaName; + tNameFromString(&smaName, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); + snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", dropReq.name); + + SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName); + pSma = mndAcquireSma(pMnode, dropReq.name); if (!pSma && dropReq.igNotExists) { code = 0; @@ -1684,9 +1766,23 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) { goto _OVER; } - code = mndDropSma(pMnode, pReq, pDb, pSma); + SCreateTSMACxt cxt = { + .pDb = pDb, + .pMnode = pMnode, + .pRpcReq = pReq, + .pSma = pSma, + .streamName = streamName, + .targetStbFullName = streamTargetStbFullName, + .pDropSmaReq = &dropReq, + }; - code = TSDB_CODE_SUCCESS; + code = mndDropTSMA(&cxt); + + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; _OVER: + + mndReleaseStb(pMnode, pStb); + mndReleaseSma(pMnode, pSma); + mndReleaseDb(pMnode, pDb); return code; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c418241605..d68e1006e9 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -62,6 +62,8 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDb static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq); static int32_t mndProcessDropIndexReq(SRpcMsg *pReq); +static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq); + int32_t mndInitStb(SMnode *pMnode) { SSdbTable table = { .sdbType = SDB_STB, @@ -87,6 +89,8 @@ int32_t mndInitStb(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp); mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer); mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq); + mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode); + mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq); // mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq); @@ -3693,3 +3697,14 @@ static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) { _OVER: return code; }*/ + +static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) { + int32_t code = mndProcessDropStbReq(pReq); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + pReq->info.rsp = rpcMallocCont(1); + pReq->info.rspLen = 1; + pReq->info.noResp = false; + pReq->code = code; + } + return code; +} diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0f826cd85f..bf6b9f862e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -112,7 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode); mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp); - mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint); @@ -2362,7 +2362,7 @@ static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) { static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) { int32_t code = mndProcessDropStreamReq(pReq); - if (code != 0) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { pReq->info.rsp = rpcMallocCont(1); pReq->info.rspLen = 1; pReq->info.noResp = false; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 9a2380698d..6b7debc4b6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -10724,9 +10724,10 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) { code = translateCreateTSMA(pCxt, (SCreateTSMAStmt*)pNode); break; case QUERY_NODE_SHOW_CREATE_TSMA_STMT: - case QUERY_NODE_DROP_TSMA_STMT: break; - + case QUERY_NODE_DROP_TSMA_STMT: + code =translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode); + break; default: break; }