From 53def5b77b0b4698dd0903fdb94aa169bd16de23 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 May 2022 20:38:46 +0800 Subject: [PATCH] refactor: make trans support multi steps --- include/util/tdef.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 20 +- source/dnode/mnode/impl/inc/mndTrans.h | 34 +- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 584 ++++++++++----------- tests/test/c/sdbDump.c | 4 +- 6 files changed, 296 insertions(+), 349 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index ad7206f7bb..709e2b8a7f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -254,6 +254,7 @@ typedef enum ELogicConditionType { #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_ERROR_LEN 64 +#define TSDB_TRANS_DESC_LEN 128 #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4d5aab4590..fd0f54c66b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -60,14 +60,12 @@ typedef enum { typedef enum { TRN_STAGE_PREPARE = 0, - TRN_STAGE_REDO_LOG = 1, - TRN_STAGE_REDO_ACTION = 2, - TRN_STAGE_ROLLBACK = 3, - TRN_STAGE_UNDO_ACTION = 4, - TRN_STAGE_UNDO_LOG = 5, - TRN_STAGE_COMMIT = 6, - TRN_STAGE_COMMIT_LOG = 7, - TRN_STAGE_FINISHED = 8 + TRN_STAGE_REDO_ACTION = 1, + TRN_STAGE_ROLLBACK = 2, + TRN_STAGE_UNDO_ACTION = 3, + TRN_STAGE_COMMIT = 4, + TRN_STAGE_COMMIT_ACTION = 5, + TRN_STAGE_FINISHED = 6 } ETrnStage; typedef enum { @@ -168,16 +166,16 @@ typedef struct { SRpcHandleInfo rpcInfo; void* rpcRsp; int32_t rpcRspLen; - SArray* redoLogs; - SArray* undoLogs; - SArray* commitLogs; + int32_t redoActionPos; SArray* redoActions; SArray* undoActions; + SArray* commitActions; int64_t createdTime; int64_t lastExecTime; int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; + char desc[TSDB_TRANS_DESC_LEN]; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index ce302a88e3..d9408467ad 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -26,31 +26,23 @@ typedef enum { TRANS_START_FUNC_TEST = 1, TRANS_STOP_FUNC_TEST = 2, TRANS_START_FUNC_MQ_REB = 3, - TRANS_STOP_FUNC_TEST_MQ_REB = 4, + TRANS_STOP_FUNC_MQ_REB = 4, } ETrnFunc; typedef struct { - SEpSet epSet; - tmsg_t msgType; - int8_t msgSent; - int8_t msgReceived; - int32_t errCode; - int32_t acceptableCode; - int32_t contLen; - void *pCont; -} STransAction; - -typedef struct { + int32_t id; + tmsg_t msgType; + int8_t msgSent; + int8_t msgReceived; + int8_t isRaw; + int8_t rawWritten; SSdbRaw *pRaw; -} STransLog; - -typedef struct { - ETrnStep stepType; - STransAction redoAction; - STransAction undoAction; - STransLog redoLog; - STransLog undoLog; -} STransStep; + SEpSet epSet; + int32_t errCode; + int32_t acceptableCode; + int32_t contLen; + void *pCont; +} STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e58630ddbf..7e72aa2425 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -493,7 +493,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 4. TODO commit log: modification log // 5. set cb - mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0); + mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0); // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9d392c64fb..239e1bf4b1 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -43,13 +43,13 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); @@ -83,40 +83,30 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} +static int32_t mndTransGetActionsSize(SArray *pArray) { + int32_t actionNum = taosArrayGetSize(pArray); + int32_t rawDataLen = 0; + + for (int32_t i = 0; i < actionNum; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + if (pAction->isRaw) { + rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t)); + } else { + rawDataLen += (sizeof(STransAction) + pAction->contLen); + } + rawDataLen += sizeof(pAction->isRaw); + } + + return rawDataLen; +} + static SSdbRaw *mndTransActionEncode(STrans *pTrans) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE; - int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); - int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); - int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); - int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); - int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); - - for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < redoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->contLen); - } - - for (int32_t i = 0; i < undoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->contLen); - } + rawDataLen += mndTransGetActionsSize(pTrans->redoActions); + rawDataLen += mndTransGetActionsSize(pTrans->undoActions); + rawDataLen += mndTransGetActionsSize(pTrans->commitActions); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen); if (pRaw == NULL) { @@ -126,67 +116,67 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER) - - ETrnStage stage = pTrans->stage; - if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) { - stage = TRN_STAGE_PREPARE; - } else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) { - stage = TRN_STAGE_ROLLBACK; - } else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) { - stage = TRN_STAGE_COMMIT; - } else { - } - - SDB_SET_INT16(pRaw, dataPos, stage, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER) - SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER) - SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER) + + int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); + int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); + int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions); SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) - - for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } + SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER) for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } + } + + for (int32_t i = 0; i < commitActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } } SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER) @@ -220,11 +210,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { char *pData = NULL; int32_t dataLen = 0; int8_t sver = 0; - int32_t redoLogNum = 0; - int32_t undoLogNum = 0; - int32_t commitLogNum = 0; int32_t redoActionNum = 0; int32_t undoActionNum = 0; + int32_t commitActionNum = 0; int32_t dataPos = 0; STransAction action = {0}; @@ -258,76 +246,85 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, &redoLogNum, _OVER) - SDB_GET_INT32(pRaw, dataPos, &undoLogNum, _OVER) - SDB_GET_INT32(pRaw, dataPos, &commitLogNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) + SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER) - pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *)); pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction)); + pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction)); - if (pTrans->redoLogs == NULL) goto _OVER; - if (pTrans->undoLogs == NULL) goto _OVER; - if (pTrans->commitLogs == NULL) goto _OVER; if (pTrans->redoActions == NULL) goto _OVER; if (pTrans->undoActions == NULL) goto _OVER; - - for (int32_t i = 0; i < redoLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } + if (pTrans->commitActions == NULL) goto _OVER; for (int32_t i = 0; i < redoActionNum; ++i) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } } for (int32_t i = 0; i < undoActionNum; ++i) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } + } + + for (int32_t i = 0; i < commitActionNum; ++i) { + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } } SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER) @@ -360,20 +357,16 @@ static const char *mndTransStr(ETrnStage stage) { switch (stage) { case TRN_STAGE_PREPARE: return "prepare"; - case TRN_STAGE_REDO_LOG: - return "redoLog"; case TRN_STAGE_REDO_ACTION: return "redoAction"; - case TRN_STAGE_COMMIT: - return "commit"; - case TRN_STAGE_COMMIT_LOG: - return "commitLog"; - case TRN_STAGE_UNDO_ACTION: - return "undoAction"; - case TRN_STAGE_UNDO_LOG: - return "undoLog"; case TRN_STAGE_ROLLBACK: return "rollback"; + case TRN_STAGE_UNDO_ACTION: + return "undoAction"; + case TRN_STAGE_COMMIT: + return "commit"; + case TRN_STAGE_COMMIT_ACTION: + return "commitAction"; case TRN_STAGE_FINISHED: return "finished"; default: @@ -472,7 +465,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) { return mndTransTestStopFunc; case TRANS_START_FUNC_MQ_REB: return mndRebCntInc; - case TRANS_STOP_FUNC_TEST_MQ_REB: + case TRANS_STOP_FUNC_MQ_REB: return mndRebCntDec; default: return NULL; @@ -493,11 +486,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { } static void mndTransDropData(STrans *pTrans) { - mndTransDropLogs(pTrans->redoLogs); - mndTransDropLogs(pTrans->undoLogs); - mndTransDropLogs(pTrans->commitLogs); mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); + mndTransDropActions(pTrans->commitActions); if (pTrans->rpcRsp != NULL) { taosMemoryFree(pTrans->rpcRsp); pTrans->rpcRsp = NULL; @@ -526,8 +517,8 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { if (pNew->stage == TRN_STAGE_COMMIT) { - pNew->stage = TRN_STAGE_COMMIT_LOG; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); + pNew->stage = TRN_STAGE_COMMIT_ACTION; + mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_ACTION)); } if (pNew->stage == TRN_STAGE_ROLLBACK) { @@ -568,14 +559,11 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->type = type; pTrans->createdTime = taosGetTimestampMs(); if (pReq != NULL) pTrans->rpcInfo = pReq->info; - pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); - if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || - pTrans->redoActions == NULL || pTrans->undoActions == NULL) { + if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to create transaction since %s", terrstr()); return NULL; @@ -585,21 +573,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return pTrans; } -static void mndTransDropLogs(SArray *pArray) { - int32_t size = taosArrayGetSize(pArray); - for (int32_t i = 0; i < size; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - sdbFreeRaw(pRaw); - } - - taosArrayDestroy(pArray); -} - static void mndTransDropActions(SArray *pArray) { int32_t size = taosArrayGetSize(pArray); for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - taosMemoryFreeClear(pAction->pCont); + if (pAction->isRaw) { + sdbFreeRaw(pAction->pRaw); + } else { + taosMemoryFreeClear(pAction->pCont); + } } taosArrayDestroy(pArray); @@ -613,27 +595,6 @@ void mndTransDrop(STrans *pTrans) { } } -static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { - if (pArray == NULL || pRaw == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - void *ptr = taosArrayPush(pArray, &pRaw); - if (ptr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); } - -int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); } - -int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); } - static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { @@ -644,6 +605,21 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { return 0; } +int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->redoActions, &action); +} + +int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->undoActions, &action); +} + +int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->commitActions, &action); +} + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { return mndTransAppendAction(pTrans->redoActions, pAction); } @@ -768,7 +744,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - if (taosArrayGetSize(pTrans->commitLogs) <= 0) { + if (taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; @@ -799,8 +775,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0; - mDebug("trans:%d, commit transaction", pTrans->id); if (mndTransSync(pMnode, pTrans) != 0) { mError("trans:%d, failed to commit since %s", pTrans->id, terrstr()); @@ -829,8 +803,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } if (pTrans->policy == TRN_POLICY_ROLLBACK) { - if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || - pTrans->stage == TRN_STAGE_ROLLBACK) { + if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } @@ -930,30 +903,6 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { return code; } -static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); - if (code != 0) { - mError("failed to execute redoLogs since %s", terrstr()); - } - return code; -} - -static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); - if (code != 0) { - mError("failed to execute undoLogs since %s, return success", terrstr()); - } - - return 0; // return success in any case -} - -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs); - if (code != 0) { - mError("failed to execute commitLogs since %s", terrstr()); - } - return code; -} static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); @@ -962,6 +911,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; + if (pAction->rawWritten && pAction->errCode == 0) continue; pAction->msgSent = 0; pAction->msgReceived = 0; @@ -970,56 +920,71 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) } } +static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); + if (code == 0) { + mDebug("trans:%d, action:%d write to sdb", pTrans->id, pAction->id); + } else { + mError("trans:%d, action:%d failed to write sdb since %s", pTrans->id, pAction->id, terrstr()); + } + + return code; +} + +static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->msgSent) return 0; + if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1; + + int64_t signature = pTrans->id; + signature = (signature << 32); + signature += pAction->id; + + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + + int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg); + if (code == 0) { + pAction->msgSent = 1; + pAction->msgReceived = 0; + pAction->errCode = 0; + mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, pAction->id, + pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port); + } else { + pAction->msgSent = 0; + pAction->msgReceived = 0; + pAction->errCode = (terrno != 0) ? terrno : code; + mError("trans:%d, action:%d not send since %s", pTrans->id, pAction->id, terrstr()); + } + + return code; +} + +static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->isRaw) { + return mndTransWriteSingleLog(pMnode, pTrans, pAction); + } else { + return mndTransSendSingleMsg(pMnode, pTrans, pAction); + } +} + static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); + int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; - - if (pAction->msgSent) { - if (pAction->msgReceived) { - continue; - } else { - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - break; - } else { - continue; - } - } - } - - int64_t signature = pTrans->id; - signature = (signature << 32); - signature += action; - - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; - rpcMsg.pCont = rpcMallocCont(pAction->contLen); - if (rpcMsg.pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - - if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) { - mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn, - pAction->epSet.eps[pAction->epSet.inUse].port); - pAction->msgSent = 1; - pAction->msgReceived = 0; - pAction->errCode = 0; - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - break; - } - } else { - pAction->msgSent = 0; - pAction->msgReceived = 0; - pAction->errCode = terrno; - mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); - return -1; + code = mndTransExecSingleAction(pMnode, pTrans, pAction); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + break; } } - return 0; + return code; } static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { @@ -1075,35 +1040,52 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { return code; } +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitActions); + if (code != 0) { + mError("failed to execute commitActions since %s", terrstr()); + } + return code; +} + static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; - pTrans->stage = TRN_STAGE_REDO_LOG; - mDebug("trans:%d, stage from prepare to redoLog", pTrans->id); + pTrans->stage = TRN_STAGE_REDO_ACTION; + mDebug("trans:%d, stage from prepare to redoAction", pTrans->id); return continueExec; } -static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); +static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec; + STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); if (code == 0) { - pTrans->code = 0; - pTrans->stage = TRN_STAGE_REDO_ACTION; - mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id); + pTrans->redoActionPos++; + mDebug("trans:%d, redo action:%d is executed and need sync to other mnodes", pTrans->id, pAction->id); + + // todo sync these infos + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("trans:%d, redo action:%d is in progress and wait it finish", pTrans->id, pAction->id); + continueExec = false; } else { - pTrans->code = terrno; - pTrans->stage = TRN_STAGE_UNDO_LOG; - mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr()); + mError("trans:%d, redo action:%d failed to execute since %s", pTrans->id, pAction->id, terrstr()); + continueExec = false; } return continueExec; } static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { - if (!pMnode->deploy && !mndIsMaster(pMnode)) return false; - bool continueExec = true; - int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); + int32_t code = 0; + + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans); + } else { + code = mndTransExecuteRedoActions(pMnode, pTrans); + } if (code == 0) { pTrans->code = 0; @@ -1135,8 +1117,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->code = 0; - pTrans->stage = TRN_STAGE_COMMIT_LOG; - mDebug("trans:%d, stage from commit to commitLog", pTrans->id); + pTrans->stage = TRN_STAGE_COMMIT_ACTION; + mDebug("trans:%d, stage from commit to commitAction", pTrans->id); continueExec = true; } else { pTrans->code = terrno; @@ -1155,35 +1137,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; - int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); + int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); if (code == 0) { pTrans->code = 0; pTrans->stage = TRN_STAGE_FINISHED; - mDebug("trans:%d, stage from commitLog to finished", pTrans->id); + mDebug("trans:%d, stage from commitAction to finished", pTrans->id); continueExec = true; } else { pTrans->code = terrno; pTrans->failedTimes++; - mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); - continueExec = false; - } - - return continueExec; -} - -static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mDebug("trans:%d, stage from undoLog to rollback", pTrans->id); - continueExec = true; - } else { - mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } @@ -1191,14 +1157,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { } static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { - if (!pMnode->deploy && !mndIsMaster(pMnode)) return false; - bool continueExec = true; int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_UNDO_LOG; - mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); + pTrans->stage = TRN_STAGE_ROLLBACK; + mDebug("trans:%d, stage from undoAction to rollback", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); @@ -1257,24 +1221,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { case TRN_STAGE_PREPARE: continueExec = mndTransPerformPrepareStage(pMnode, pTrans); break; - case TRN_STAGE_REDO_LOG: - continueExec = mndTransPerformRedoLogStage(pMnode, pTrans); - break; case TRN_STAGE_REDO_ACTION: continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); break; - case TRN_STAGE_UNDO_LOG: - continueExec = mndTransPerformUndoLogStage(pMnode, pTrans); + case TRN_STAGE_COMMIT: + continueExec = mndTransPerformCommitStage(pMnode, pTrans); + break; + case TRN_STAGE_COMMIT_ACTION: + continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); break; case TRN_STAGE_UNDO_ACTION: continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); break; - case TRN_STAGE_COMMIT_LOG: - continueExec = mndTransPerformCommitLogStage(pMnode, pTrans); - break; - case TRN_STAGE_COMMIT: - continueExec = mndTransPerformCommitStage(pMnode, pTrans); - break; case TRN_STAGE_ROLLBACK: continueExec = mndTransPerformRollbackStage(pMnode, pTrans); break; diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 8be2822c0a..7343b4f829 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) { tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid)); tjsonAddStringToObject(item, "dbname", pObj->dbname); - tjsonAddIntegerToObject(item, "redoLogNum", taosArrayGetSize(pObj->redoLogs)); - tjsonAddIntegerToObject(item, "undoLogNum", taosArrayGetSize(pObj->undoLogs)); - tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitLogs)); + tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));