diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 27af7eb27e..b8baf99552 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -264,7 +264,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) -#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) +#define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) // mnode-mq #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 263fd0bad5..d516b0bf26 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -57,11 +57,11 @@ typedef enum { TRN_STAGE_PREPARE = 0, TRN_STAGE_REDO_LOG = 1, TRN_STAGE_REDO_ACTION = 2, - TRN_STAGE_COMMIT = 3, - TRN_STAGE_COMMIT_LOG = 4, - TRN_STAGE_UNDO_ACTION = 5, - TRN_STAGE_UNDO_LOG = 6, - TRN_STAGE_ROLLBACK = 7, + TRN_STAGE_ROLLBACK = 3, + TRN_STAGE_UNDO_ACTION = 4, + TRN_STAGE_UNDO_LOG = 5, + TRN_STAGE_COMMIT = 6, + TRN_STAGE_COMMIT_LOG = 7, TRN_STAGE_FINISHED = 8 } ETrnStage; @@ -72,6 +72,7 @@ typedef enum { TRN_TYPE_DROP_USER = 1003, TRN_TYPE_CREATE_FUNC = 1004, TRN_TYPE_DROP_FUNC = 1005, + TRN_TYPE_CREATE_SNODE = 1006, TRN_TYPE_DROP_SNODE = 1007, TRN_TYPE_CREATE_QNODE = 1008, @@ -91,10 +92,12 @@ typedef enum { TRN_TYPE_CONSUMER_LOST = 1022, TRN_TYPE_CONSUMER_RECOVER = 1023, TRN_TYPE_BASIC_SCOPE_END, + TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_CREATE_DNODE = 2001, TRN_TYPE_DROP_DNODE = 2002, TRN_TYPE_GLOBAL_SCOPE_END, + TRN_TYPE_DB_SCOPE = 3000, TRN_TYPE_CREATE_DB = 3001, TRN_TYPE_ALTER_DB = 3002, @@ -102,6 +105,7 @@ typedef enum { TRN_TYPE_SPLIT_VGROUP = 3004, TRN_TYPE_MERGE_VGROUP = 3015, TRN_TYPE_DB_SCOPE_END, + TRN_TYPE_STB_SCOPE = 4000, TRN_TYPE_CREATE_STB = 4001, TRN_TYPE_ALTER_STB = 4002, @@ -131,7 +135,7 @@ typedef struct { int32_t id; ETrnStage stage; ETrnPolicy policy; - ETrnType transType; + ETrnType type; int32_t code; int32_t failedTimes; void* rpcHandle; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 5a2f37429a..ce2a0efd2f 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -63,13 +63,15 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); int32_t mndInitTrans(SMnode *pMnode) { - SSdbTable table = {.sdbType = SDB_TRANS, - .keyType = SDB_KEY_INT32, - .encodeFp = (SdbEncodeFp)mndTransActionEncode, - .decodeFp = (SdbDecodeFp)mndTransActionDecode, - .insertFp = (SdbInsertFp)mndTransActionInsert, - .updateFp = (SdbUpdateFp)mndTransActionUpdate, - .deleteFp = (SdbDeleteFp)mndTransActionDelete}; + SSdbTable table = { + .sdbType = SDB_TRANS, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndTransActionEncode, + .decodeFp = (SdbDecodeFp)mndTransActionDecode, + .insertFp = (SdbInsertFp)mndTransActionInsert, + .updateFp = (SdbUpdateFp)mndTransActionUpdate, + .deleteFp = (SdbDeleteFp)mndTransActionDelete, + }; mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); @@ -123,71 +125,82 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { } int32_t dataPos = 0; - SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER) - SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER) - SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER) - SDB_SET_INT16(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER) - SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER) + + ETrnStage stage = pTrans->stage; + if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) { + stage = TRN_STAGE_PREPARE; + } else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) { + stage = TRN_STAGE_ROLLBACK; + } else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) { + stage = TRN_STAGE_COMMIT; + } else { + } + + SDB_SET_INT16(pRaw, dataPos, stage, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) + SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) + SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER) + SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER) + SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER) + SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER) + SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) for (int32_t i = 0; i < redoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) } for (int32_t i = 0; i < undoLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) } for (int32_t i = 0; i < commitLogNum; ++i) { SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) } for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER) } - SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, TRANS_ENCODE_OVER) - SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, TRANS_ENCODE_OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, _OVER) if (pTrans->param != NULL) { - SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_ENCODE_OVER) + SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER) } - SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER) - SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER) + SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER) + SDB_SET_DATALEN(pRaw, dataPos, _OVER) terrno = 0; -TRANS_ENCODE_OVER: +_OVER: if (terrno != 0) { mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr()); sdbFreeRaw(pRaw); @@ -229,15 +242,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER) - int16_t type = 0; - int16_t policy = 0; int16_t stage = 0; - SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) + int16_t policy = 0; + int16_t type = 0; SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) + SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER) - pTrans->policy = policy; pTrans->stage = stage; - pTrans->transType = type; + pTrans->policy = policy; + pTrans->type = type; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) @@ -400,6 +413,16 @@ static const char *mndTransType(ETrnType type) { return "subscribe"; case TRN_TYPE_REBALANCE: return "rebalance"; + case TRN_TYPE_COMMIT_OFFSET: + return "commit-offset"; + case TRN_TYPE_CREATE_STREAM: + return "create-stream"; + case TRN_TYPE_DROP_STREAM: + return "drop-stream"; + case TRN_TYPE_CONSUMER_LOST: + return "consumer-lost"; + case TRN_TYPE_CONSUMER_RECOVER: + return "consumer-recover"; case TRN_TYPE_CREATE_DNODE: return "create-qnode"; case TRN_TYPE_DROP_DNODE: @@ -453,7 +476,6 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) { } static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { - // pTrans->stage = TRN_STAGE_PREPARE; mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); if (pTrans->startFunc > 0) { @@ -516,8 +538,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { } static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { - SSdb *pSdb = pMnode->pSdb; - STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); + STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId); if (pTrans == NULL) { terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; } @@ -540,7 +561,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->transType = type; + pTrans->type = type; pTrans->createdTime = taosGetTimestampMs(); pTrans->rpcHandle = pReq->handle; pTrans->rpcAHandle = pReq->ahandle; @@ -558,7 +579,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return NULL; } - mDebug("trans:%d, local var is created, data:%p", pTrans->id, pTrans); + mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans); return pTrans; } @@ -585,14 +606,14 @@ static void mndTransDropActions(SArray *pArray) { void mndTransDrop(STrans *pTrans) { if (pTrans != NULL) { mndTransDropData(pTrans); - mDebug("trans:%d, local var is freed, data:%p", pTrans->id, pTrans); + mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); taosMemoryFreeClear(pTrans); } } static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + terrno = TSDB_CODE_INVALID_PARA; return -1; } @@ -674,27 +695,27 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } static bool mndIsBasicTrans(STrans *pTrans) { - return pTrans->stage > TRN_TYPE_BASIC_SCOPE && pTrans->stage < TRN_TYPE_BASIC_SCOPE_END; + return pTrans->type > TRN_TYPE_BASIC_SCOPE && pTrans->type < TRN_TYPE_BASIC_SCOPE_END; } static bool mndIsGlobalTrans(STrans *pTrans) { - return pTrans->stage > TRN_TYPE_GLOBAL_SCOPE && pTrans->stage < TRN_TYPE_GLOBAL_SCOPE_END; + return pTrans->type > TRN_TYPE_GLOBAL_SCOPE && pTrans->type < TRN_TYPE_GLOBAL_SCOPE_END; } static bool mndIsDbTrans(STrans *pTrans) { - return pTrans->stage > TRN_TYPE_DB_SCOPE && pTrans->stage < TRN_TYPE_DB_SCOPE_END; + return pTrans->type > TRN_TYPE_DB_SCOPE && pTrans->type < TRN_TYPE_DB_SCOPE_END; } static bool mndIsStbTrans(STrans *pTrans) { - return pTrans->stage > TRN_TYPE_STB_SCOPE && pTrans->stage < TRN_TYPE_STB_SCOPE_END; + return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END; } -static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewTrans) { - if (mndIsBasicTrans(pNewTrans)) return 0; - +static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) { STrans *pTrans = NULL; void *pIter = NULL; - int32_t code = 0; + bool canParallel = true; + + if (mndIsBasicTrans(pNewTrans)) return canParallel; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); @@ -703,42 +724,35 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT if (mndIsGlobalTrans(pNewTrans)) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - code = -1; - break; + canParallel = false; + } else { } } - if (mndIsDbTrans(pNewTrans)) { - if (mndIsBasicTrans(pTrans)) continue; + else if (mndIsDbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - code = -1; - break; - } - if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { + canParallel = false; + } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - code = -1; - break; + canParallel = false; } + } else { } } - if (mndIsStbTrans(pNewTrans)) { - if (mndIsBasicTrans(pTrans)) continue; + else if (mndIsStbTrans(pNewTrans)) { if (mndIsGlobalTrans(pTrans)) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); - code = -1; - break; - } - if (mndIsDbTrans(pTrans)) { + canParallel = false; + } else if (mndIsDbTrans(pTrans)) { if (pNewTrans->dbUid == pTrans->dbUid) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); - code = -1; - break; + canParallel = false; } + } else { } - if (mndIsStbTrans(pTrans)) continue; } sdbRelease(pMnode->pSdb, pTrans); @@ -746,12 +760,12 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT sdbCancelFetch(pMnode->pSdb, pIter); sdbRelease(pMnode->pSdb, pTrans); - return code; + return canParallel; } int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { - if (mndCheckTransCanBeStartedInParallel(pMnode, pTrans) != 0) { - terrno = TSDB_CODE_MND_TRANS_CANT_PARALLEL; + if (!mndCheckTransCanParallel(pMnode, pTrans)) { + terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; } @@ -833,12 +847,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, pTrans->rpcAHandle); - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, - .code = pTrans->code, - .ahandle = pTrans->rpcAHandle, - .refId = pTrans->rpcRefId, - .pCont = rpcCont, - .contLen = pTrans->rpcRspLen}; + SRpcMsg rspMsg = { + .handle = pTrans->rpcHandle, + .ahandle = pTrans->rpcAHandle, + .refId = pTrans->rpcRefId, + .code = pTrans->code, + .pCont = rpcCont, + .contLen = pTrans->rpcRspLen, + }; tmsgSendRsp(&rspMsg); pTrans->rpcHandle = NULL; pTrans->rpcRsp = NULL; @@ -1360,10 +1376,10 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); - char transType[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; - STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->transType), pShow->pMeta->pSchemas[cols].bytes); + char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; + STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)transType, false); + colDataAppend(pColInfo, numOfRows, (const char *)type, false); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); diff --git a/source/dnode/mnode/impl/test/trans/trans2.cpp b/source/dnode/mnode/impl/test/trans/trans2.cpp index abab24723f..252cd105a0 100644 --- a/source/dnode/mnode/impl/test/trans/trans2.cpp +++ b/source/dnode/mnode/impl/test/trans/trans2.cpp @@ -22,7 +22,7 @@ class MndTestTrans2 : public ::testing::Test { static void SetUpTestSuite() { dDebugFlag = 143; vDebugFlag = 0; - mDebugFlag = 143; + mDebugFlag = 207; cDebugFlag = 0; jniDebugFlag = 0; tmrDebugFlag = 135; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 709efa3125..bd787d50b0 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -271,7 +271,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") -TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill") +TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed") // mnode-mq TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")