Merge pull request #12102 from taosdata/feature/dnode

refactor: adjust transaction unitest
This commit is contained in:
Shengliang Guan 2022-05-04 16:42:50 +08:00 committed by GitHub
commit 4c43929e39
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 122 additions and 102 deletions

View File

@ -264,7 +264,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0) #define TSDB_CODE_MND_TRANS_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D0)
#define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1) #define TSDB_CODE_MND_TRANS_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03D1)
#define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2) #define TSDB_CODE_MND_TRANS_INVALID_STAGE TAOS_DEF_ERROR_CODE(0, 0x03D2)
#define TSDB_CODE_MND_TRANS_CANT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4) #define TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL TAOS_DEF_ERROR_CODE(0, 0x03D4)
// mnode-mq // mnode-mq
#define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0) #define TSDB_CODE_MND_TOPIC_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E0)

View File

@ -57,11 +57,11 @@ typedef enum {
TRN_STAGE_PREPARE = 0, TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_LOG = 1, TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_REDO_ACTION = 2, TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_COMMIT = 3, TRN_STAGE_ROLLBACK = 3,
TRN_STAGE_COMMIT_LOG = 4, TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_UNDO_ACTION = 5, TRN_STAGE_UNDO_LOG = 5,
TRN_STAGE_UNDO_LOG = 6, TRN_STAGE_COMMIT = 6,
TRN_STAGE_ROLLBACK = 7, TRN_STAGE_COMMIT_LOG = 7,
TRN_STAGE_FINISHED = 8 TRN_STAGE_FINISHED = 8
} ETrnStage; } ETrnStage;
@ -72,6 +72,7 @@ typedef enum {
TRN_TYPE_DROP_USER = 1003, TRN_TYPE_DROP_USER = 1003,
TRN_TYPE_CREATE_FUNC = 1004, TRN_TYPE_CREATE_FUNC = 1004,
TRN_TYPE_DROP_FUNC = 1005, TRN_TYPE_DROP_FUNC = 1005,
TRN_TYPE_CREATE_SNODE = 1006, TRN_TYPE_CREATE_SNODE = 1006,
TRN_TYPE_DROP_SNODE = 1007, TRN_TYPE_DROP_SNODE = 1007,
TRN_TYPE_CREATE_QNODE = 1008, TRN_TYPE_CREATE_QNODE = 1008,
@ -91,10 +92,12 @@ typedef enum {
TRN_TYPE_CONSUMER_LOST = 1022, TRN_TYPE_CONSUMER_LOST = 1022,
TRN_TYPE_CONSUMER_RECOVER = 1023, TRN_TYPE_CONSUMER_RECOVER = 1023,
TRN_TYPE_BASIC_SCOPE_END, TRN_TYPE_BASIC_SCOPE_END,
TRN_TYPE_GLOBAL_SCOPE = 2000, TRN_TYPE_GLOBAL_SCOPE = 2000,
TRN_TYPE_CREATE_DNODE = 2001, TRN_TYPE_CREATE_DNODE = 2001,
TRN_TYPE_DROP_DNODE = 2002, TRN_TYPE_DROP_DNODE = 2002,
TRN_TYPE_GLOBAL_SCOPE_END, TRN_TYPE_GLOBAL_SCOPE_END,
TRN_TYPE_DB_SCOPE = 3000, TRN_TYPE_DB_SCOPE = 3000,
TRN_TYPE_CREATE_DB = 3001, TRN_TYPE_CREATE_DB = 3001,
TRN_TYPE_ALTER_DB = 3002, TRN_TYPE_ALTER_DB = 3002,
@ -102,6 +105,7 @@ typedef enum {
TRN_TYPE_SPLIT_VGROUP = 3004, TRN_TYPE_SPLIT_VGROUP = 3004,
TRN_TYPE_MERGE_VGROUP = 3015, TRN_TYPE_MERGE_VGROUP = 3015,
TRN_TYPE_DB_SCOPE_END, TRN_TYPE_DB_SCOPE_END,
TRN_TYPE_STB_SCOPE = 4000, TRN_TYPE_STB_SCOPE = 4000,
TRN_TYPE_CREATE_STB = 4001, TRN_TYPE_CREATE_STB = 4001,
TRN_TYPE_ALTER_STB = 4002, TRN_TYPE_ALTER_STB = 4002,
@ -131,7 +135,7 @@ typedef struct {
int32_t id; int32_t id;
ETrnStage stage; ETrnStage stage;
ETrnPolicy policy; ETrnPolicy policy;
ETrnType transType; ETrnType type;
int32_t code; int32_t code;
int32_t failedTimes; int32_t failedTimes;
void* rpcHandle; void* rpcHandle;

View File

@ -63,13 +63,15 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter); static void mndCancelGetNextTrans(SMnode *pMnode, void *pIter);
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {
.keyType = SDB_KEY_INT32, .sdbType = SDB_TRANS,
.encodeFp = (SdbEncodeFp)mndTransActionEncode, .keyType = SDB_KEY_INT32,
.decodeFp = (SdbDecodeFp)mndTransActionDecode, .encodeFp = (SdbEncodeFp)mndTransActionEncode,
.insertFp = (SdbInsertFp)mndTransActionInsert, .decodeFp = (SdbDecodeFp)mndTransActionDecode,
.updateFp = (SdbUpdateFp)mndTransActionUpdate, .insertFp = (SdbInsertFp)mndTransActionInsert,
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete,
};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq); mndSetMsgHandle(pMnode, TDMT_MND_TRANS_TIMER, mndProcessTransReq);
mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq); mndSetMsgHandle(pMnode, TDMT_MND_KILL_TRANS, mndProcessKillTransReq);
@ -123,71 +125,82 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
} }
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER) ETrnStage stage = pTrans->stage;
SDB_SET_INT16(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER) if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) {
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER) stage = TRN_STAGE_PREPARE;
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER) } else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) {
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER) stage = TRN_STAGE_ROLLBACK;
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER) } else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) {
SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER) stage = TRN_STAGE_COMMIT;
SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER) } else {
SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER) }
SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
} }
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
} }
for (int32_t i = 0; i < commitLogNum; ++i) { for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp); int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
} }
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i); STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
} }
for (int32_t i = 0; i < undoActionNum; ++i) { for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i); STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER)
} }
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, TRANS_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, _OVER)
if (pTrans->param != NULL) { if (pTrans->param != NULL) {
SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_ENCODE_OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER)
} }
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER) SDB_SET_DATALEN(pRaw, dataPos, _OVER)
terrno = 0; terrno = 0;
TRANS_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr()); mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
@ -229,15 +242,15 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER)
int16_t type = 0;
int16_t policy = 0;
int16_t stage = 0; int16_t stage = 0;
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER) int16_t policy = 0;
int16_t type = 0;
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER) SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
SDB_GET_INT16(pRaw, dataPos, &type, _OVER) SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
pTrans->policy = policy;
pTrans->stage = stage; pTrans->stage = stage;
pTrans->transType = type; pTrans->policy = policy;
pTrans->type = type;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
@ -400,6 +413,16 @@ static const char *mndTransType(ETrnType type) {
return "subscribe"; return "subscribe";
case TRN_TYPE_REBALANCE: case TRN_TYPE_REBALANCE:
return "rebalance"; return "rebalance";
case TRN_TYPE_COMMIT_OFFSET:
return "commit-offset";
case TRN_TYPE_CREATE_STREAM:
return "create-stream";
case TRN_TYPE_DROP_STREAM:
return "drop-stream";
case TRN_TYPE_CONSUMER_LOST:
return "consumer-lost";
case TRN_TYPE_CONSUMER_RECOVER:
return "consumer-recover";
case TRN_TYPE_CREATE_DNODE: case TRN_TYPE_CREATE_DNODE:
return "create-qnode"; return "create-qnode";
case TRN_TYPE_DROP_DNODE: case TRN_TYPE_DROP_DNODE:
@ -453,7 +476,6 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
} }
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
// pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage)); mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
if (pTrans->startFunc > 0) { if (pTrans->startFunc > 0) {
@ -516,8 +538,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
} }
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
SSdb *pSdb = pMnode->pSdb; STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) { if (pTrans == NULL) {
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST; terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
} }
@ -540,7 +561,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
pTrans->stage = TRN_STAGE_PREPARE; pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy; pTrans->policy = policy;
pTrans->transType = type; pTrans->type = type;
pTrans->createdTime = taosGetTimestampMs(); pTrans->createdTime = taosGetTimestampMs();
pTrans->rpcHandle = pReq->handle; pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle; pTrans->rpcAHandle = pReq->ahandle;
@ -558,7 +579,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
return NULL; return NULL;
} }
mDebug("trans:%d, local var is created, data:%p", pTrans->id, pTrans); mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
return pTrans; return pTrans;
} }
@ -585,14 +606,14 @@ static void mndTransDropActions(SArray *pArray) {
void mndTransDrop(STrans *pTrans) { void mndTransDrop(STrans *pTrans) {
if (pTrans != NULL) { if (pTrans != NULL) {
mndTransDropData(pTrans); mndTransDropData(pTrans);
mDebug("trans:%d, local var is freed, data:%p", pTrans->id, pTrans); mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
taosMemoryFreeClear(pTrans); taosMemoryFreeClear(pTrans);
} }
} }
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
if (pArray == NULL || pRaw == NULL) { if (pArray == NULL || pRaw == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_INVALID_PARA;
return -1; return -1;
} }
@ -674,27 +695,27 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
} }
static bool mndIsBasicTrans(STrans *pTrans) { static bool mndIsBasicTrans(STrans *pTrans) {
return pTrans->stage > TRN_TYPE_BASIC_SCOPE && pTrans->stage < TRN_TYPE_BASIC_SCOPE_END; return pTrans->type > TRN_TYPE_BASIC_SCOPE && pTrans->type < TRN_TYPE_BASIC_SCOPE_END;
} }
static bool mndIsGlobalTrans(STrans *pTrans) { static bool mndIsGlobalTrans(STrans *pTrans) {
return pTrans->stage > TRN_TYPE_GLOBAL_SCOPE && pTrans->stage < TRN_TYPE_GLOBAL_SCOPE_END; return pTrans->type > TRN_TYPE_GLOBAL_SCOPE && pTrans->type < TRN_TYPE_GLOBAL_SCOPE_END;
} }
static bool mndIsDbTrans(STrans *pTrans) { static bool mndIsDbTrans(STrans *pTrans) {
return pTrans->stage > TRN_TYPE_DB_SCOPE && pTrans->stage < TRN_TYPE_DB_SCOPE_END; return pTrans->type > TRN_TYPE_DB_SCOPE && pTrans->type < TRN_TYPE_DB_SCOPE_END;
} }
static bool mndIsStbTrans(STrans *pTrans) { static bool mndIsStbTrans(STrans *pTrans) {
return pTrans->stage > TRN_TYPE_STB_SCOPE && pTrans->stage < TRN_TYPE_STB_SCOPE_END; return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END;
} }
static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewTrans) { static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
if (mndIsBasicTrans(pNewTrans)) return 0;
STrans *pTrans = NULL; STrans *pTrans = NULL;
void *pIter = NULL; void *pIter = NULL;
int32_t code = 0; bool canParallel = true;
if (mndIsBasicTrans(pNewTrans)) return canParallel;
while (1) { while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans); pIter = sdbFetch(pMnode->pSdb, SDB_TRANS, pIter, (void **)&pTrans);
@ -703,42 +724,35 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT
if (mndIsGlobalTrans(pNewTrans)) { if (mndIsGlobalTrans(pNewTrans)) {
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
code = -1; canParallel = false;
break; } else {
} }
} }
if (mndIsDbTrans(pNewTrans)) { else if (mndIsDbTrans(pNewTrans)) {
if (mndIsBasicTrans(pTrans)) continue;
if (mndIsGlobalTrans(pTrans)) { if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
code = -1; canParallel = false;
break; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
}
if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
code = -1; canParallel = false;
break;
} }
} else {
} }
} }
if (mndIsStbTrans(pNewTrans)) { else if (mndIsStbTrans(pNewTrans)) {
if (mndIsBasicTrans(pTrans)) continue;
if (mndIsGlobalTrans(pTrans)) { if (mndIsGlobalTrans(pTrans)) {
mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id);
code = -1; canParallel = false;
break; } else if (mndIsDbTrans(pTrans)) {
}
if (mndIsDbTrans(pTrans)) {
if (pNewTrans->dbUid == pTrans->dbUid) { if (pNewTrans->dbUid == pTrans->dbUid) {
mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname);
code = -1; canParallel = false;
break;
} }
} else {
} }
if (mndIsStbTrans(pTrans)) continue;
} }
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
@ -746,12 +760,12 @@ static int32_t mndCheckTransCanBeStartedInParallel(SMnode *pMnode, STrans *pNewT
sdbCancelFetch(pMnode->pSdb, pIter); sdbCancelFetch(pMnode->pSdb, pIter);
sdbRelease(pMnode->pSdb, pTrans); sdbRelease(pMnode->pSdb, pTrans);
return code; return canParallel;
} }
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
if (mndCheckTransCanBeStartedInParallel(pMnode, pTrans) != 0) { if (!mndCheckTransCanParallel(pMnode, pTrans)) {
terrno = TSDB_CODE_MND_TRANS_CANT_PARALLEL; terrno = TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1; return -1;
} }
@ -833,12 +847,14 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage, mDebug("trans:%d, send rsp, code:0x%04x stage:%d app:%p", pTrans->id, pTrans->code & 0xFFFF, pTrans->stage,
pTrans->rpcAHandle); pTrans->rpcAHandle);
SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, SRpcMsg rspMsg = {
.code = pTrans->code, .handle = pTrans->rpcHandle,
.ahandle = pTrans->rpcAHandle, .ahandle = pTrans->rpcAHandle,
.refId = pTrans->rpcRefId, .refId = pTrans->rpcRefId,
.pCont = rpcCont, .code = pTrans->code,
.contLen = pTrans->rpcRspLen}; .pCont = rpcCont,
.contLen = pTrans->rpcRspLen,
};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
pTrans->rpcHandle = NULL; pTrans->rpcHandle = NULL;
pTrans->rpcRsp = NULL; pTrans->rpcRsp = NULL;
@ -1360,10 +1376,10 @@ static int32_t mndRetrieveTrans(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *pB
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)dbname, false); colDataAppend(pColInfo, numOfRows, (const char *)dbname, false);
char transType[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0}; char type[TSDB_TRANS_TYPE_LEN + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->transType), pShow->pMeta->pSchemas[cols].bytes); STR_WITH_MAXSIZE_TO_VARSTR(dbname, mndTransType(pTrans->type), pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)transType, false); colDataAppend(pColInfo, numOfRows, (const char *)type, false);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false); colDataAppend(pColInfo, numOfRows, (const char *)&pTrans->lastExecTime, false);

View File

@ -22,7 +22,7 @@ class MndTestTrans2 : public ::testing::Test {
static void SetUpTestSuite() { static void SetUpTestSuite() {
dDebugFlag = 143; dDebugFlag = 143;
vDebugFlag = 0; vDebugFlag = 0;
mDebugFlag = 143; mDebugFlag = 207;
cDebugFlag = 0; cDebugFlag = 0;
jniDebugFlag = 0; jniDebugFlag = 0;
tmrDebugFlag = 135; tmrDebugFlag = 135;

View File

@ -271,7 +271,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_INVALID_FUNC_RETRIEVE, "Invalid func retrieve
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_ALREADY_EXIST, "Transaction already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NOT_EXIST, "Transaction not exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_INVALID_STAGE, "Invalid stage to kill")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CANT_PARALLEL, "Invalid stage to kill") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CAN_NOT_PARALLEL, "Conflicting transaction not completed")
// mnode-mq // mnode-mq
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOPIC_ALREADY_EXIST, "Topic already exists")