fix create tsma transaction

This commit is contained in:
wangjiaming0909 2024-04-17 15:29:36 +08:00
parent 6aa09f4d8f
commit 2c0624a8ae
2 changed files with 23 additions and 2 deletions

View File

@ -1526,6 +1526,8 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
int32_t code = -1;
STransAction createStreamRedoAction = {0};
STransAction createStreamUndoAction = {0};
STransAction dropStbUndoAction = {0};
SMDropStbReq dropStbReq = {0};
STrans *pTrans =
mndTransCreate(pCxt->pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pCxt->pRpcReq, "create-tsma");
if (!pTrans) {
@ -1556,7 +1558,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
createStreamUndoAction.epSet = createStreamRedoAction.epSet;
createStreamUndoAction.acceptableCode = TSDB_CODE_MND_STREAM_NOT_EXIST;
createStreamUndoAction.actionType = TDMT_STREAM_DROP;
createStreamUndoAction.msgType = TDMT_STREAM_DROP;
createStreamUndoAction.contLen = tSerializeSMDropStreamReq(0, 0, pCxt->pDropStreamReq);
createStreamUndoAction.pCont = taosMemoryCalloc(1, createStreamUndoAction.contLen);
if (!createStreamUndoAction.pCont) {
@ -1569,6 +1571,24 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
goto _OVER;
}
dropStbReq.igNotExists = true;
strncpy(dropStbReq.name, pCxt->targetStbFullName, TSDB_TABLE_FNAME_LEN);
dropStbUndoAction.epSet = createStreamRedoAction.epSet;
dropStbUndoAction.acceptableCode = TSDB_CODE_MND_STB_NOT_EXIST;
dropStbUndoAction.retryCode = TSDB_CODE_MND_STREAM_MUST_BE_DELETED;
dropStbUndoAction.msgType = TDMT_MND_STB_DROP;
dropStbUndoAction.contLen = tSerializeSMDropStbReq(0, 0, &dropStbReq);
dropStbUndoAction.pCont = taosMemoryCalloc(1, dropStbUndoAction.contLen);
if (!dropStbUndoAction.pCont) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _OVER;
}
if (dropStbUndoAction.contLen != tSerializeSMDropStbReq(dropStbUndoAction.pCont, dropStbUndoAction.contLen, &dropStbReq)) {
mError("sma: %s, failed to create due to drop stb req encode failure", pCxt->pCreateSmaReq->name);
terrno = TSDB_CODE_INVALID_MSG;
goto _OVER;
}
SDbObj newDb = {0};
memcpy(&newDb, pCxt->pDb, sizeof(SDbObj));
newDb.tsmaVersion++;
@ -1579,6 +1599,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
if (mndSetCreateSmaCommitLogs(pCxt->pMnode, pTrans, pCxt->pSma) != 0) goto _OVER;
if (mndTransAppendRedoAction(pTrans, &createStreamRedoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &createStreamUndoAction) != 0) goto _OVER;
if (mndTransAppendUndoAction(pTrans, &dropStbUndoAction) != 0) goto _OVER;
if (mndTransPrepare(pCxt->pMnode, pTrans) != 0) goto _OVER;
code = TSDB_CODE_SUCCESS;

View File

@ -1109,7 +1109,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
goto _OVER;
}
int32_t actionNum = taosArrayGetSize(pTrans->redoActions);
int32_t actionNum = taosArrayGetSize(pArray);
if (action < 0 || action >= actionNum) {
mError("trans:%d, invalid action:%d", transId, action);
goto _OVER;