drop tsma

This commit is contained in:
wangjiaming0909 2023-11-24 14:21:12 +08:00
parent 8e2977d083
commit 6324d1293a
6 changed files with 127 additions and 12 deletions

View File

@ -225,6 +225,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TSMA, "create-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TSMA, "drop-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STB_DROP, "drop-stb", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
TD_NEW_MSG_SEG(TDMT_VND_MSG) // 2<<8

View File

@ -167,6 +167,8 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_GRANT_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_TSMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STB_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_RETRIEVE_IP_WHITE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_GET_USER_WHITELIST, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -58,7 +58,10 @@ static void mndCancelRetrieveIdx(SMnode *pMnode, void *pIter);
typedef struct SCreateTSMACxt {
SMnode * pMnode;
const SRpcMsg * pRpcReq;
union {
const SMCreateSmaReq *pCreateSmaReq;
const SMDropSmaReq * pDropSmaReq;
};
const SDbObj * pDb;
const SStbObj * pSrcStb;
// TODO normal table
@ -1419,7 +1422,7 @@ static void initStreamObj(SStreamObj *pStream, const char *streamName, const SMC
pStream->ast = taosStrdup(pSma->ast);
}
static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) {
static void mndCreateTSMABuildCreateStreamReq(SCreateTSMACxt *pCxt) {
tstrncpy(pCxt->pCreateStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
tstrncpy(pCxt->pCreateStreamReq->sourceDB, pCxt->pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(pCxt->pCreateStreamReq->targetStbFullName, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
@ -1451,11 +1454,13 @@ static void mndCreateTSMABuildStreamOpReq(SCreateTSMACxt *pCxt) {
tstrncpy(f.name, pSchema->name, TSDB_COL_NAME_LEN);
taosArrayPush(pCxt->pCreateStreamReq->pTags, &f);
}
}
static void mndCreateTSMABuildDropStreamReq(SCreateTSMACxt* pCxt) {
tstrncpy(pCxt->pDropStreamReq->name, pCxt->streamName, TSDB_STREAM_FNAME_LEN);
pCxt->pDropStreamReq->igNotExists = false;
// TODO fill sql
pCxt->pDropStreamReq->sql = strdup(pCxt->pCreateSmaReq->sql);
pCxt->pDropStreamReq->sql = strdup(pCxt->pDropSmaReq->name);
pCxt->pDropStreamReq->sqlLen = strlen(pCxt->pDropStreamReq->sql);
}
@ -1471,14 +1476,14 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction redoAction = {0};
STransAction undoAction = {0};
// TODO trans conflicting setting
// TODO trans conflicting setting, maybe conflict with myself
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
if ((mndTransCheckConflict(pCxt->pMnode, pTrans)) != 0) goto _OVER;
if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
mndTransSetParallel(pTrans);
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
@ -1538,7 +1543,8 @@ static int32_t mndCreateTSMA(SCreateTSMACxt *pCxt) {
initSMAObj(&sma, pCxt->pCreateSmaReq, pCxt->pSrcStb, pCxt->pDb);
pCxt->pCreateStreamReq = &createStreamReq;
pCxt->pDropStreamReq = &dropStreamReq;
mndCreateTSMABuildStreamOpReq(pCxt);
mndCreateTSMABuildCreateStreamReq(pCxt);
mndCreateTSMABuildDropStreamReq(pCxt);
if (TSDB_CODE_SUCCESS != mndCreateTSMATxnPrepare(pCxt)) {
code = -1;
@ -1580,6 +1586,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
pStb = mndAcquireStb(pMnode, createReq.stb);
if (!pStb) {
mError("tsma:%s, failed to create since stb:%s not exist", createReq.name, createReq.stb);
terrno = TSDB_CODE_MND_STB_NOT_EXIST;
goto _OVER;
}
@ -1592,6 +1599,7 @@ static int32_t mndProcessCreateTSMAReq(SRpcMsg* pReq) {
SStbObj *pTargetStb = mndAcquireStb(pMnode, streamTargetStbFullName);
if (pTargetStb) {
terrno = TSDB_CODE_TDB_STB_ALREADY_EXIST;
mError("tsma: %s, failed to create since output stable already exists: %s", createReq.name,
streamTargetStbFullName);
goto _OVER;
@ -1654,6 +1662,71 @@ _OVER:
return code;
}
static int32_t mndDropTSMA(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction dropStreamRedoAction = {0};
STrans *pTrans = mndTransCreate(pCxt->pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "drop-tsma");
if (!pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
SMDropStreamReq dropStreamReq = {0};
pCxt->pDropStreamReq = &dropStreamReq;
mndCreateTSMABuildDropStreamReq(pCxt);
mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
mndTransSetSerial(pTrans);
mndGetMnodeEpSet(pCxt->pMnode, &dropStreamRedoAction.epSet);
dropStreamRedoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
dropStreamRedoAction.msgType = TDMT_STREAM_DROP;
dropStreamRedoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
dropStreamRedoAction.pCont = taosMemoryCalloc(1, dropStreamRedoAction.contLen);
if (!dropStreamRedoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dropStreamRedoAction.contLen !=
tSerializeSMDropStreamReq(dropStreamRedoAction.pCont, dropStreamRedoAction.contLen, pCxt->pDropStreamReq)) {
mError("tsma: %s, failed to drop due to drop stream req encode failure", pCxt->pDropSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
// output stable is not drop when dropping stream, dropping it when dropping tsma
SMDropStbReq dropStbReq = {0};
dropStbReq.igNotExists = false;
tstrncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
// TODO fill sql
dropStbReq.sql = "drop";
dropStbReq.sqlLen = 5;
STransAction dropStbRedoAction = {0};
mndGetMnodeEpSet(pCxt->pMnode, &dropStbRedoAction.epSet);
dropStbRedoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
dropStbRedoAction.msgType = TDMT_MND_STB_DROP;
dropStbRedoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
dropStbRedoAction.pCont = taosMemoryCalloc(1, dropStbRedoAction.contLen);
if (!dropStbRedoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dropStbRedoAction.contLen != tSerializeSMDropStbReq(dropStbRedoAction.pCont, dropStbRedoAction.contLen, &dropStbReq)) {
mError("tsma: %s, failedto drop due to drop stb req encode failure", pCxt->pDropSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
if (mndSetDropSmaRedoLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndSetDropSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &dropStreamRedoAction) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &dropStbRedoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;
_OVER:
mndTransDrop(pTrans);
return code;
}
static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
int32_t code = -1;
SMDropSmaReq dropReq = {0};
@ -1665,6 +1738,15 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
goto _OVER;
}
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
char streamTargetStbFullName[TSDB_TABLE_FNAME_LEN] = {0};
SName smaName;
tNameFromString(&smaName, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
snprintf(streamTargetStbFullName, TSDB_TABLE_FNAME_LEN, "%s.tsma_result_stb", dropReq.name);
SStbObj* pStb = mndAcquireStb(pMnode, streamTargetStbFullName);
pSma = mndAcquireSma(pMnode, dropReq.name);
if (!pSma && dropReq.igNotExists) {
code = 0;
@ -1684,9 +1766,23 @@ static int32_t mndProcessDropTSMAReq(SRpcMsg* pReq) {
goto _OVER;
}
code = mndDropSma(pMnode, pReq, pDb, pSma);
SCreateTSMACxt cxt = {
.pDb = pDb,
.pMnode = pMnode,
.pRpcReq = pReq,
.pSma = pSma,
.streamName = streamName,
.targetStbFullName = streamTargetStbFullName,
.pDropSmaReq = &dropReq,
};
code = TSDB_CODE_SUCCESS;
code = mndDropTSMA(&cxt);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
_OVER:
mndReleaseStb(pMnode, pStb);
mndReleaseSma(pMnode, pSma);
mndReleaseDb(pMnode, pDb);
return code;
}

View File

@ -62,6 +62,8 @@ static int32_t mndAlterStbAndUpdateTagIdxImp(SMnode *pMnode, SRpcMsg *pReq, SDb
static int32_t mndProcessCreateIndexReq(SRpcMsg *pReq);
static int32_t mndProcessDropIndexReq(SRpcMsg *pReq);
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq);
int32_t mndInitStb(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STB,
@ -87,6 +89,8 @@ int32_t mndInitStb(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_S3MIGRATE_RSP, mndProcessS3MigrateDbRsp);
mndSetMsgHandle(pMnode, TDMT_MND_S3MIGRATE_DB_TIMER, mndProcessS3MigrateDbTimer);
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP, mndProcessDropStbReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_MND_STB_DROP_RSP, mndTransProcessRsp);
// mndSetMsgHandle(pMnode, TDMT_MND_SYSTABLE_RETRIEVE, mndProcessRetrieveStbReq);
// mndSetMsgHandle(pMnode, TDMT_MND_CREATE_INDEX, mndProcessCreateIndexReq);
@ -3693,3 +3697,14 @@ static int32_t mndProcessDropIndexReq(SRpcMsg *pReq) {
_OVER:
return code;
}*/
static int32_t mndProcessDropStbReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessDropStbReq(pReq);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
pReq->info.rsp = rpcMallocCont(1);
pReq->info.rspLen = 1;
pReq->info.noResp = false;
pReq->code = code;
}
return code;
}

View File

@ -112,7 +112,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP, mndProcessDropStreamReqFromMNode);
mndSetMsgHandle(pMnode, TDMT_STREAM_DROP_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
@ -2362,7 +2362,7 @@ static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessDropStreamReq(pReq);
if (code != 0) {
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
pReq->info.rsp = rpcMallocCont(1);
pReq->info.rspLen = 1;
pReq->info.noResp = false;

View File

@ -10724,9 +10724,10 @@ static int32_t translateQuery(STranslateContext* pCxt, SNode* pNode) {
code = translateCreateTSMA(pCxt, (SCreateTSMAStmt*)pNode);
break;
case QUERY_NODE_SHOW_CREATE_TSMA_STMT:
case QUERY_NODE_DROP_TSMA_STMT:
break;
case QUERY_NODE_DROP_TSMA_STMT:
code =translateDropTSMA(pCxt, (SDropTSMAStmt*)pNode);
break;
default:
break;
}