From 53def5b77b0b4698dd0903fdb94aa169bd16de23 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 30 May 2022 20:38:46 +0800 Subject: [PATCH 1/3] refactor: make trans support multi steps --- include/util/tdef.h | 1 + source/dnode/mnode/impl/inc/mndDef.h | 20 +- source/dnode/mnode/impl/inc/mndTrans.h | 34 +- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 584 ++++++++++----------- tests/test/c/sdbDump.c | 4 +- 6 files changed, 296 insertions(+), 349 deletions(-) diff --git a/include/util/tdef.h b/include/util/tdef.h index ad7206f7bb..709e2b8a7f 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -254,6 +254,7 @@ typedef enum ELogicConditionType { #define TSDB_TRANS_STAGE_LEN 12 #define TSDB_TRANS_TYPE_LEN 16 #define TSDB_TRANS_ERROR_LEN 64 +#define TSDB_TRANS_DESC_LEN 128 #define TSDB_STEP_NAME_LEN 32 #define TSDB_STEP_DESC_LEN 128 diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 4d5aab4590..fd0f54c66b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -60,14 +60,12 @@ typedef enum { typedef enum { TRN_STAGE_PREPARE = 0, - TRN_STAGE_REDO_LOG = 1, - TRN_STAGE_REDO_ACTION = 2, - TRN_STAGE_ROLLBACK = 3, - TRN_STAGE_UNDO_ACTION = 4, - TRN_STAGE_UNDO_LOG = 5, - TRN_STAGE_COMMIT = 6, - TRN_STAGE_COMMIT_LOG = 7, - TRN_STAGE_FINISHED = 8 + TRN_STAGE_REDO_ACTION = 1, + TRN_STAGE_ROLLBACK = 2, + TRN_STAGE_UNDO_ACTION = 3, + TRN_STAGE_COMMIT = 4, + TRN_STAGE_COMMIT_ACTION = 5, + TRN_STAGE_FINISHED = 6 } ETrnStage; typedef enum { @@ -168,16 +166,16 @@ typedef struct { SRpcHandleInfo rpcInfo; void* rpcRsp; int32_t rpcRspLen; - SArray* redoLogs; - SArray* undoLogs; - SArray* commitLogs; + int32_t redoActionPos; SArray* redoActions; SArray* undoActions; + SArray* commitActions; int64_t createdTime; int64_t lastExecTime; int64_t dbUid; char dbname[TSDB_DB_FNAME_LEN]; char lastError[TSDB_TRANS_ERROR_LEN]; + char desc[TSDB_TRANS_DESC_LEN]; int32_t startFunc; int32_t stopFunc; int32_t paramLen; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index ce302a88e3..d9408467ad 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -26,31 +26,23 @@ typedef enum { TRANS_START_FUNC_TEST = 1, TRANS_STOP_FUNC_TEST = 2, TRANS_START_FUNC_MQ_REB = 3, - TRANS_STOP_FUNC_TEST_MQ_REB = 4, + TRANS_STOP_FUNC_MQ_REB = 4, } ETrnFunc; typedef struct { - SEpSet epSet; - tmsg_t msgType; - int8_t msgSent; - int8_t msgReceived; - int32_t errCode; - int32_t acceptableCode; - int32_t contLen; - void *pCont; -} STransAction; - -typedef struct { + int32_t id; + tmsg_t msgType; + int8_t msgSent; + int8_t msgReceived; + int8_t isRaw; + int8_t rawWritten; SSdbRaw *pRaw; -} STransLog; - -typedef struct { - ETrnStep stepType; - STransAction redoAction; - STransAction undoAction; - STransLog redoLog; - STransLog undoLog; -} STransStep; + SEpSet epSet; + int32_t errCode; + int32_t acceptableCode; + int32_t contLen; + void *pCont; +} STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e58630ddbf..7e72aa2425 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -493,7 +493,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu // 4. TODO commit log: modification log // 5. set cb - mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0); + mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0); // 6. execution if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 9d392c64fb..239e1bf4b1 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -43,13 +43,13 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); -static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans); +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); @@ -83,40 +83,30 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} +static int32_t mndTransGetActionsSize(SArray *pArray) { + int32_t actionNum = taosArrayGetSize(pArray); + int32_t rawDataLen = 0; + + for (int32_t i = 0; i < actionNum; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + if (pAction->isRaw) { + rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t)); + } else { + rawDataLen += (sizeof(STransAction) + pAction->contLen); + } + rawDataLen += sizeof(pAction->isRaw); + } + + return rawDataLen; +} + static SSdbRaw *mndTransActionEncode(STrans *pTrans) { terrno = TSDB_CODE_OUT_OF_MEMORY; int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE; - int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); - int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); - int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); - int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); - int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); - - for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t)); - } - - for (int32_t i = 0; i < redoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->contLen); - } - - for (int32_t i = 0; i < undoActionNum; ++i) { - STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->contLen); - } + rawDataLen += mndTransGetActionsSize(pTrans->redoActions); + rawDataLen += mndTransGetActionsSize(pTrans->undoActions); + rawDataLen += mndTransGetActionsSize(pTrans->commitActions); SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen); if (pRaw == NULL) { @@ -126,67 +116,67 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { int32_t dataPos = 0; SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER) - - ETrnStage stage = pTrans->stage; - if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) { - stage = TRN_STAGE_PREPARE; - } else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) { - stage = TRN_STAGE_ROLLBACK; - } else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) { - stage = TRN_STAGE_COMMIT; - } else { - } - - SDB_SET_INT16(pRaw, dataPos, stage, _OVER) + SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER) - SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER) - SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER) + + int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); + int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); + int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions); SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) - - for (int32_t i = 0; i < redoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); - int32_t len = sdbGetRawTotalSize(pTmp); - SDB_SET_INT32(pRaw, dataPos, len, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER) - } + SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER) for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) - SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } + } + + for (int32_t i = 0; i < commitActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + if (pAction->isRaw) { + int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT32(pRaw, dataPos, len, _OVER) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) + } else { + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) + SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) + } } SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER) @@ -220,11 +210,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { char *pData = NULL; int32_t dataLen = 0; int8_t sver = 0; - int32_t redoLogNum = 0; - int32_t undoLogNum = 0; - int32_t commitLogNum = 0; int32_t redoActionNum = 0; int32_t undoActionNum = 0; + int32_t commitActionNum = 0; int32_t dataPos = 0; STransAction action = {0}; @@ -258,76 +246,85 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, &redoLogNum, _OVER) - SDB_GET_INT32(pRaw, dataPos, &undoLogNum, _OVER) - SDB_GET_INT32(pRaw, dataPos, &commitLogNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) + SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER) - pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *)); pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction)); + pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction)); - if (pTrans->redoLogs == NULL) goto _OVER; - if (pTrans->undoLogs == NULL) goto _OVER; - if (pTrans->commitLogs == NULL) goto _OVER; if (pTrans->redoActions == NULL) goto _OVER; if (pTrans->undoActions == NULL) goto _OVER; - - for (int32_t i = 0; i < redoLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } - - for (int32_t i = 0; i < undoLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } - - for (int32_t i = 0; i < commitLogNum; ++i) { - SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; - mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto _OVER; - pData = NULL; - } + if (pTrans->commitActions == NULL) goto _OVER; for (int32_t i = 0; i < redoActionNum; ++i) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } } for (int32_t i = 0; i < undoActionNum; ++i) { - SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); - SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) - action.pCont = taosMemoryMalloc(action.contLen); - if (action.pCont == NULL) goto _OVER; - SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; - action.pCont = NULL; + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } + } + + for (int32_t i = 0; i < commitActionNum; ++i) { + SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + if (action.isRaw) { + SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) + pData = taosMemoryMalloc(dataLen); + if (pData == NULL) goto _OVER; + mTrace("raw:%p, is created", pData); + SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &pData) == NULL) goto _OVER; + pData = NULL; + } else { + SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); + SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) + action.pCont = taosMemoryMalloc(action.contLen); + if (action.pCont == NULL) goto _OVER; + SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; + action.pCont = NULL; + } } SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER) @@ -360,20 +357,16 @@ static const char *mndTransStr(ETrnStage stage) { switch (stage) { case TRN_STAGE_PREPARE: return "prepare"; - case TRN_STAGE_REDO_LOG: - return "redoLog"; case TRN_STAGE_REDO_ACTION: return "redoAction"; - case TRN_STAGE_COMMIT: - return "commit"; - case TRN_STAGE_COMMIT_LOG: - return "commitLog"; - case TRN_STAGE_UNDO_ACTION: - return "undoAction"; - case TRN_STAGE_UNDO_LOG: - return "undoLog"; case TRN_STAGE_ROLLBACK: return "rollback"; + case TRN_STAGE_UNDO_ACTION: + return "undoAction"; + case TRN_STAGE_COMMIT: + return "commit"; + case TRN_STAGE_COMMIT_ACTION: + return "commitAction"; case TRN_STAGE_FINISHED: return "finished"; default: @@ -472,7 +465,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) { return mndTransTestStopFunc; case TRANS_START_FUNC_MQ_REB: return mndRebCntInc; - case TRANS_STOP_FUNC_TEST_MQ_REB: + case TRANS_STOP_FUNC_MQ_REB: return mndRebCntDec; default: return NULL; @@ -493,11 +486,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { } static void mndTransDropData(STrans *pTrans) { - mndTransDropLogs(pTrans->redoLogs); - mndTransDropLogs(pTrans->undoLogs); - mndTransDropLogs(pTrans->commitLogs); mndTransDropActions(pTrans->redoActions); mndTransDropActions(pTrans->undoActions); + mndTransDropActions(pTrans->commitActions); if (pTrans->rpcRsp != NULL) { taosMemoryFree(pTrans->rpcRsp); pTrans->rpcRsp = NULL; @@ -526,8 +517,8 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { if (pNew->stage == TRN_STAGE_COMMIT) { - pNew->stage = TRN_STAGE_COMMIT_LOG; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG)); + pNew->stage = TRN_STAGE_COMMIT_ACTION; + mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_ACTION)); } if (pNew->stage == TRN_STAGE_ROLLBACK) { @@ -568,14 +559,11 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->type = type; pTrans->createdTime = taosGetTimestampMs(); if (pReq != NULL) pTrans->rpcInfo = pReq->info; - pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); - if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || - pTrans->redoActions == NULL || pTrans->undoActions == NULL) { + if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; mError("failed to create transaction since %s", terrstr()); return NULL; @@ -585,21 +573,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return pTrans; } -static void mndTransDropLogs(SArray *pArray) { - int32_t size = taosArrayGetSize(pArray); - for (int32_t i = 0; i < size; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - sdbFreeRaw(pRaw); - } - - taosArrayDestroy(pArray); -} - static void mndTransDropActions(SArray *pArray) { int32_t size = taosArrayGetSize(pArray); for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - taosMemoryFreeClear(pAction->pCont); + if (pAction->isRaw) { + sdbFreeRaw(pAction->pRaw); + } else { + taosMemoryFreeClear(pAction->pCont); + } } taosArrayDestroy(pArray); @@ -613,27 +595,6 @@ void mndTransDrop(STrans *pTrans) { } } -static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { - if (pArray == NULL || pRaw == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - void *ptr = taosArrayPush(pArray, &pRaw); - if (ptr == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); } - -int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); } - -int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); } - static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { @@ -644,6 +605,21 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { return 0; } +int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->redoActions, &action); +} + +int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->undoActions, &action); +} + +int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = {.isRaw = true, .pRaw = pRaw}; + return mndTransAppendAction(pTrans->commitActions, &action); +} + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { return mndTransAppendAction(pTrans->redoActions, pAction); } @@ -768,7 +744,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - if (taosArrayGetSize(pTrans->commitLogs) <= 0) { + if (taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; @@ -799,8 +775,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0; - mDebug("trans:%d, commit transaction", pTrans->id); if (mndTransSync(pMnode, pTrans) != 0) { mError("trans:%d, failed to commit since %s", pTrans->id, terrstr()); @@ -829,8 +803,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } if (pTrans->policy == TRN_POLICY_ROLLBACK) { - if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION || - pTrans->stage == TRN_STAGE_ROLLBACK) { + if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } @@ -930,30 +903,6 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { return code; } -static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); - if (code != 0) { - mError("failed to execute redoLogs since %s", terrstr()); - } - return code; -} - -static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); - if (code != 0) { - mError("failed to execute undoLogs since %s, return success", terrstr()); - } - - return 0; // return success in any case -} - -static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs); - if (code != 0) { - mError("failed to execute commitLogs since %s", terrstr()); - } - return code; -} static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); @@ -962,6 +911,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; + if (pAction->rawWritten && pAction->errCode == 0) continue; pAction->msgSent = 0; pAction->msgReceived = 0; @@ -970,56 +920,71 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) } } +static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); + if (code == 0) { + mDebug("trans:%d, action:%d write to sdb", pTrans->id, pAction->id); + } else { + mError("trans:%d, action:%d failed to write sdb since %s", pTrans->id, pAction->id, terrstr()); + } + + return code; +} + +static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->msgSent) return 0; + if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1; + + int64_t signature = pTrans->id; + signature = (signature << 32); + signature += pAction->id; + + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + + int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg); + if (code == 0) { + pAction->msgSent = 1; + pAction->msgReceived = 0; + pAction->errCode = 0; + mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, pAction->id, + pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port); + } else { + pAction->msgSent = 0; + pAction->msgReceived = 0; + pAction->errCode = (terrno != 0) ? terrno : code; + mError("trans:%d, action:%d not send since %s", pTrans->id, pAction->id, terrstr()); + } + + return code; +} + +static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->isRaw) { + return mndTransWriteSingleLog(pMnode, pTrans, pAction); + } else { + return mndTransSendSingleMsg(pMnode, pTrans, pAction); + } +} + static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); + int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; - - if (pAction->msgSent) { - if (pAction->msgReceived) { - continue; - } else { - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - break; - } else { - continue; - } - } - } - - int64_t signature = pTrans->id; - signature = (signature << 32); - signature += action; - - SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; - rpcMsg.pCont = rpcMallocCont(pAction->contLen); - if (rpcMsg.pCont == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); - - if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) { - mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn, - pAction->epSet.eps[pAction->epSet.inUse].port); - pAction->msgSent = 1; - pAction->msgReceived = 0; - pAction->errCode = 0; - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - break; - } - } else { - pAction->msgSent = 0; - pAction->msgReceived = 0; - pAction->errCode = terrno; - mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr()); - return -1; + code = mndTransExecSingleAction(pMnode, pTrans, pAction); + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + break; } } - return 0; + return code; } static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { @@ -1075,35 +1040,52 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { return code; } +static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitActions); + if (code != 0) { + mError("failed to execute commitActions since %s", terrstr()); + } + return code; +} + static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; - pTrans->stage = TRN_STAGE_REDO_LOG; - mDebug("trans:%d, stage from prepare to redoLog", pTrans->id); + pTrans->stage = TRN_STAGE_REDO_ACTION; + mDebug("trans:%d, stage from prepare to redoAction", pTrans->id); return continueExec; } -static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); +static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { + bool continueExec = true; + if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec; + STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); if (code == 0) { - pTrans->code = 0; - pTrans->stage = TRN_STAGE_REDO_ACTION; - mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id); + pTrans->redoActionPos++; + mDebug("trans:%d, redo action:%d is executed and need sync to other mnodes", pTrans->id, pAction->id); + + // todo sync these infos + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("trans:%d, redo action:%d is in progress and wait it finish", pTrans->id, pAction->id); + continueExec = false; } else { - pTrans->code = terrno; - pTrans->stage = TRN_STAGE_UNDO_LOG; - mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr()); + mError("trans:%d, redo action:%d failed to execute since %s", pTrans->id, pAction->id, terrstr()); + continueExec = false; } return continueExec; } static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { - if (!pMnode->deploy && !mndIsMaster(pMnode)) return false; - bool continueExec = true; - int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); + int32_t code = 0; + + if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { + code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans); + } else { + code = mndTransExecuteRedoActions(pMnode, pTrans); + } if (code == 0) { pTrans->code = 0; @@ -1135,8 +1117,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { if (code == 0) { pTrans->code = 0; - pTrans->stage = TRN_STAGE_COMMIT_LOG; - mDebug("trans:%d, stage from commit to commitLog", pTrans->id); + pTrans->stage = TRN_STAGE_COMMIT_ACTION; + mDebug("trans:%d, stage from commit to commitAction", pTrans->id); continueExec = true; } else { pTrans->code = terrno; @@ -1155,35 +1137,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) { +static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; - int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); + int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); if (code == 0) { pTrans->code = 0; pTrans->stage = TRN_STAGE_FINISHED; - mDebug("trans:%d, stage from commitLog to finished", pTrans->id); + mDebug("trans:%d, stage from commitAction to finished", pTrans->id); continueExec = true; } else { pTrans->code = terrno; pTrans->failedTimes++; - mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); - continueExec = false; - } - - return continueExec; -} - -static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans); - - if (code == 0) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mDebug("trans:%d, stage from undoLog to rollback", pTrans->id); - continueExec = true; - } else { - mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr()); + mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); continueExec = false; } @@ -1191,14 +1157,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) { } static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { - if (!pMnode->deploy && !mndIsMaster(pMnode)) return false; - bool continueExec = true; int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_UNDO_LOG; - mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id); + pTrans->stage = TRN_STAGE_ROLLBACK; + mDebug("trans:%d, stage from undoAction to rollback", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); @@ -1257,24 +1221,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { case TRN_STAGE_PREPARE: continueExec = mndTransPerformPrepareStage(pMnode, pTrans); break; - case TRN_STAGE_REDO_LOG: - continueExec = mndTransPerformRedoLogStage(pMnode, pTrans); - break; case TRN_STAGE_REDO_ACTION: continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); break; - case TRN_STAGE_UNDO_LOG: - continueExec = mndTransPerformUndoLogStage(pMnode, pTrans); + case TRN_STAGE_COMMIT: + continueExec = mndTransPerformCommitStage(pMnode, pTrans); + break; + case TRN_STAGE_COMMIT_ACTION: + continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); break; case TRN_STAGE_UNDO_ACTION: continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); break; - case TRN_STAGE_COMMIT_LOG: - continueExec = mndTransPerformCommitLogStage(pMnode, pTrans); - break; - case TRN_STAGE_COMMIT: - continueExec = mndTransPerformCommitStage(pMnode, pTrans); - break; case TRN_STAGE_ROLLBACK: continueExec = mndTransPerformRollbackStage(pMnode, pTrans); break; diff --git a/tests/test/c/sdbDump.c b/tests/test/c/sdbDump.c index 8be2822c0a..7343b4f829 100644 --- a/tests/test/c/sdbDump.c +++ b/tests/test/c/sdbDump.c @@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) { tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime)); tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid)); tjsonAddStringToObject(item, "dbname", pObj->dbname); - tjsonAddIntegerToObject(item, "redoLogNum", taosArrayGetSize(pObj->redoLogs)); - tjsonAddIntegerToObject(item, "undoLogNum", taosArrayGetSize(pObj->undoLogs)); - tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitLogs)); + tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions)); tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions)); tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions)); From 86f8bf6cb10832bd311737f6dff6398f49b51354 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 31 May 2022 14:38:15 +0800 Subject: [PATCH 2/3] refactor: make trans support multi steps --- source/dnode/mgmt/node_mgmt/src/dmTransport.c | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 15 +- source/dnode/mnode/impl/src/mndAcct.c | 8 +- source/dnode/mnode/impl/src/mndCluster.c | 4 +- source/dnode/mnode/impl/src/mndDb.c | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 10 +- source/dnode/mnode/impl/src/mndMain.c | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 2 +- source/dnode/mnode/impl/src/mndSync.c | 4 +- source/dnode/mnode/impl/src/mndTrans.c | 216 +++++++++++------- source/dnode/mnode/impl/src/mndUser.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 4 +- source/dnode/mnode/sdb/inc/sdb.h | 1 + source/dnode/mnode/sdb/src/sdb.c | 1 + source/dnode/mnode/sdb/src/sdbFile.c | 54 +++-- source/libs/qworker/src/qworker.c | 6 +- 17 files changed, 196 insertions(+), 139 deletions(-) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 987fc54416..e5893fd947 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -130,7 +130,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { _OVER: if (code != 0) { - dError("msg:%p, failed to process since %s", pMsg, terrstr()); + dTrace("msg:%p, failed to process since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pRpc->msgType)); if (terrno != 0) code = terrno; if (IsReq(pRpc)) { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index d9408467ad..a7e1f7cd02 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -31,17 +31,18 @@ typedef enum { typedef struct { int32_t id; - tmsg_t msgType; - int8_t msgSent; - int8_t msgReceived; - int8_t isRaw; - int8_t rawWritten; - SSdbRaw *pRaw; - SEpSet epSet; int32_t errCode; int32_t acceptableCode; + int8_t stage; + int8_t isRaw; + int8_t rawWritten; + int8_t msgSent; + int8_t msgReceived; + tmsg_t msgType; + SEpSet epSet; int32_t contLen; void *pCont; + SSdbRaw *pRaw; } STransAction; typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen); diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index a4fde4b706..f3ec3a421b 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -78,10 +78,8 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("acct:%s, will be created while deploy sdb, raw:%p", acctObj.acct, pRaw); -#if 0 - return sdbWrite(pMnode->pSdb, pRaw); -#else + mDebug("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_ACCT, NULL); if (pTrans == NULL) { mError("acct:%s, failed to create since %s", acctObj.acct, terrstr()); @@ -94,7 +92,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { mndTransDrop(pTrans); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_READY); if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); @@ -104,7 +101,6 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) { mndTransDrop(pTrans); return 0; -#endif } static SSdbRaw *mndAcctActionEncode(SAcctObj *pAcct) { diff --git a/source/dnode/mnode/impl/src/mndCluster.c b/source/dnode/mnode/impl/src/mndCluster.c index a421be5c06..76c8acf407 100644 --- a/source/dnode/mnode/impl/src/mndCluster.c +++ b/source/dnode/mnode/impl/src/mndCluster.c @@ -172,13 +172,13 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) { clusterObj.id = mndGenerateUid(clusterObj.name, TSDB_CLUSTER_ID_LEN); clusterObj.id = (clusterObj.id >= 0 ? clusterObj.id : -clusterObj.id); pMnode->clusterId = clusterObj.id; - mDebug("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); + mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name); SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj); if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("cluster:%" PRId64 ", will be created while deploy sdb, raw:%p", clusterObj.id, pRaw); + mDebug("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); #else diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 95d3383ee1..e3f843f0c7 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1314,7 +1314,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, SDbObj *pDb = mndAcquireDb(pMnode, pDbVgVersion->dbFName); if (pDb == NULL) { - mDebug("db:%s, no exist", pDbVgVersion->dbFName); + mTrace("db:%s, no exist", pDbVgVersion->dbFName); memcpy(usedbRsp.db, pDbVgVersion->dbFName, TSDB_DB_FNAME_LEN); usedbRsp.uid = pDbVgVersion->dbId; usedbRsp.vgVersion = -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 22f858c60b..8e06139c8c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -98,7 +98,7 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) { if (pRaw == NULL) return -1; if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1; - mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw); + mDebug("dnode:%d, will be created when deploying, raw:%p", dnodeObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); @@ -388,9 +388,10 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseMnode(pMnode, pObj); } + int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - bool dnodeChanged = (statusReq.dnodeVer != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); + bool dnodeChanged = (statusReq.dnodeVer != dnodeVer); bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; @@ -433,7 +434,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { if (!online) { mInfo("dnode:%d, from offline to online", pDnode->id); } else { - mDebug("dnode:%d, send dnode eps", pDnode->id); + mDebug("dnode:%d, send dnode epset, online:%d ver:% " PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online, + statusReq.dnodeVer, dnodeVer, reboot); } pDnode->rebootTime = statusReq.rebootTime; @@ -441,7 +443,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; SStatusRsp statusRsp = {0}; - statusRsp.dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); + statusRsp.dnodeVer = dnodeVer; statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId; statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 0ac36c20ed..2a2a45a45d 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -472,7 +472,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { } else if (code == 0) { mTrace("msg:%p, successfully processed and response", pMsg); } else { - mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, + mDebug("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 23634be77b..8c5ea840af 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -90,7 +90,7 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw); + mDebug("mnode:%d, will be created when deploying, raw:%p", mnodeObj.id, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index b33c09a0f9..81c3b24d97 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -1597,7 +1597,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) { pReq->info.rspLen = rspLen; code = 0; - mDebug("stb:%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); + mTrace("%s.%s, meta is retrieved", infoReq.dbFName, infoReq.tbName); _OVER: if (code != 0) { diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 8b602d796c..245f0938b9 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -65,7 +65,7 @@ int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; if (!pMnode->deploy) { - mInfo("mnode sync restore finished"); + mInfo("mnode sync restore finished, and will handle outstanding transactions"); mndTransPullup(pMnode); mndSetRestore(pMnode, true); } else { @@ -244,7 +244,7 @@ void mndSyncStart(SMnode *pMnode) { } else { syncStart(pMgmt->sync); } - mDebug("sync:%" PRId64 " is started, standby:%d", pMgmt->sync, pMgmt->standby); + mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby); } void mndSyncStop(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 239e1bf4b1..c5a1e0ba5a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -37,7 +37,6 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static void mndTransDropLogs(SArray *pArray); static void mndTransDropActions(SArray *pArray); static void mndTransDropData(STrans *pTrans); -static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); @@ -133,15 +132,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -149,15 +154,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -165,15 +176,21 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < commitActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->commitActions, i); + SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->errCode, _OVER) + SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->stage, _OVER) if (pAction->isRaw) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); + SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) + SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } @@ -259,19 +276,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (pTrans->commitActions == NULL) goto _OVER; for (int32_t i = 0; i < redoActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->redoActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -282,19 +305,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } for (int32_t i = 0; i < undoActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->undoActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -305,19 +334,25 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } for (int32_t i = 0; i < commitActionNum; ++i) { + SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.errCode, _OVER) + SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.stage, _OVER) if (action.isRaw) { + SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) - pData = taosMemoryMalloc(dataLen); - if (pData == NULL) goto _OVER; + action.pRaw = taosMemoryMalloc(dataLen); + if (action.pRaw == NULL) goto _OVER; mTrace("raw:%p, is created", pData); - SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER); - if (taosArrayPush(pTrans->commitActions, &pData) == NULL) goto _OVER; - pData = NULL; + SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER); + if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER; + action.pRaw = NULL; } else { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) + SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -344,7 +379,6 @@ _OVER: mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, terrstr()); mndTransDropData(pTrans); taosMemoryFreeClear(pRow); - taosMemoryFreeClear(pData); taosMemoryFreeClear(action.pCont); return NULL; } @@ -502,7 +536,7 @@ static void mndTransDropData(STrans *pTrans) { } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { - mDebug("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), + mTrace("trans:%d, perform delete action, row:%p stage:%s callfunc:%d", pTrans->id, pTrans, mndTransStr(pTrans->stage), callFunc); if (pTrans->stopFunc > 0 && callFunc) { TransCbFp fp = mndTransGetCbFp(pTrans->stopFunc); @@ -515,20 +549,34 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { return 0; } +static void mndTransUpdateActions(SArray *pOldArray, SArray *pNewArray) { + for (int32_t i = 0; i < taosArrayGetSize(pOldArray); ++i) { + STransAction *pOldAction = taosArrayGet(pOldArray, i); + STransAction *pNewAction = taosArrayGet(pNewArray, i); + pOldAction->rawWritten = pNewAction->rawWritten; + pOldAction->msgSent = pNewAction->msgSent; + pOldAction->msgReceived = pNewAction->msgReceived; + pOldAction->errCode = pNewAction->errCode; + } +} + static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { - if (pNew->stage == TRN_STAGE_COMMIT) { - pNew->stage = TRN_STAGE_COMMIT_ACTION; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_ACTION)); - } - - if (pNew->stage == TRN_STAGE_ROLLBACK) { - pNew->stage = TRN_STAGE_FINISHED; - mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_ROLLBACK), mndTransStr(TRN_STAGE_FINISHED)); - } - mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld, mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage)); + mndTransUpdateActions(pOld->redoActions, pNew->redoActions); + mndTransUpdateActions(pOld->undoActions, pNew->undoActions); + mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; + + if (pOld->stage == TRN_STAGE_COMMIT) { + pOld->stage = TRN_STAGE_COMMIT_ACTION; + mTrace("trans:%d, stage from commit to commitAction", pNew->id); + } + + if (pOld->stage == TRN_STAGE_ROLLBACK) { + pOld->stage = TRN_STAGE_FINISHED; + mTrace("trans:%d, stage from rollback to finished", pNew->id); + } return 0; } @@ -557,8 +605,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->type = type; + pTrans->parallel = TRN_EXEC_PARALLEL; pTrans->createdTime = taosGetTimestampMs(); - if (pReq != NULL) pTrans->rpcInfo = pReq->info; pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction)); @@ -569,7 +617,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S return NULL; } - mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans); + if (pReq != NULL) pTrans->rpcInfo = pReq->info; + mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans); return pTrans; } @@ -578,7 +627,7 @@ static void mndTransDropActions(SArray *pArray) { for (int32_t i = 0; i < size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); if (pAction->isRaw) { - sdbFreeRaw(pAction->pRaw); + taosMemoryFreeClear(pAction->pRaw); } else { taosMemoryFreeClear(pAction->pCont); } @@ -590,12 +639,14 @@ static void mndTransDropActions(SArray *pArray) { void mndTransDrop(STrans *pTrans) { if (pTrans != NULL) { mndTransDropData(pTrans); - mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); + mTrace("trans:%d, local object is freed, data:%p", pTrans->id, pTrans); taosMemoryFreeClear(pTrans); } } static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { + pAction->id = taosArrayGetSize(pArray); + void *ptr = taosArrayPush(pArray, pAction); if (ptr == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -606,25 +657,27 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_REDO_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->redoActions, &action); } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_UNDO_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->undoActions, &action); } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - STransAction action = {.isRaw = true, .pRaw = pRaw}; + STransAction action = {.stage = TRN_STAGE_COMMIT_ACTION, .isRaw = true, .pRaw = pRaw}; return mndTransAppendAction(pTrans->commitActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { + pAction->stage = TRN_STAGE_REDO_ACTION; return mndTransAppendAction(pTrans->redoActions, pAction); } int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction) { + pAction->stage = TRN_STAGE_UNDO_ACTION; return mndTransAppendAction(pTrans->undoActions, pAction); } @@ -821,7 +874,8 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } taosMemoryFree(pTrans->rpcRsp); - mDebug("trans:%d, send rsp, code:0x%x stage:%d app:%p", pTrans->id, code, pTrans->stage, pTrans->rpcInfo.ahandle); + mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), + pTrans->rpcInfo.ahandle); SRpcMsg rspMsg = { .code = code, .pCont = rpcCont, @@ -877,55 +931,46 @@ void mndTransProcessRsp(SRpcMsg *pRsp) { } } - mDebug("trans:%d, action:%d response is received, code:0x%x, accept:0x%04x", transId, action, pRsp->code, - pAction->acceptableCode); + mDebug("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x", transId, mndTransStr(pAction->stage), action, + pRsp->code, pAction->acceptableCode); mndTransExecute(pMnode, pTrans); _OVER: mndReleaseTrans(pMnode, pTrans); } -static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { - SSdb *pSdb = pMnode->pSdb; - int32_t arraySize = taosArrayGetSize(pArray); - - if (arraySize == 0) return 0; - - int32_t code = 0; - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - if (sdbWriteWithoutFree(pSdb, pRaw) != 0) { - code = ((terrno != 0) ? terrno : -1); - } - } - - terrno = code; - return code; -} - - static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue; - if (pAction->rawWritten && pAction->errCode == 0) continue; + if (pAction->msgSent && pAction->msgReceived && + (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) + continue; + if (pAction->rawWritten && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue; + pAction->rawWritten = 0; pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = 0; - mDebug("trans:%d, action:%d execute status is reset", pTrans->id, action); + mDebug("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), action); } } static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + if (pAction->rawWritten) return 0; + int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); - if (code == 0) { - mDebug("trans:%d, action:%d write to sdb", pTrans->id, pAction->id); + if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + pAction->rawWritten = true; + pAction->errCode = 0; + code = 0; + mDebug("trans:%d, %s:%d write to sdb", pTrans->id, mndTransStr(pAction->stage), pAction->id); } else { - mError("trans:%d, action:%d failed to write sdb since %s", pTrans->id, pAction->id, terrstr()); + pAction->errCode = (terrno != 0) ? terrno : code; + mError("trans:%d, %s:%d failed to write sdb since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); } return code; @@ -952,13 +997,13 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio pAction->msgSent = 1; pAction->msgReceived = 0; pAction->errCode = 0; - mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, pAction->id, + mDebug("trans:%d, %s:%d is sent to %s:%u", pTrans->id, mndTransStr(pAction->stage), pAction->id, pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port); } else { pAction->msgSent = 0; pAction->msgReceived = 0; pAction->errCode = (terrno != 0) ? terrno : code; - mError("trans:%d, action:%d not send since %s", pTrans->id, pAction->id, terrstr()); + mError("trans:%d, %s:%d not send since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); } return code; @@ -995,20 +1040,20 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return -1; } - int32_t numOfReceived = 0; + int32_t numOfExecuted = 0; int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; - if (pAction->msgSent && pAction->msgReceived) { - numOfReceived++; + if ((pAction->msgSent && pAction->msgReceived) || pAction->rawWritten) { + numOfExecuted++; if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { errCode = pAction->errCode; } } } - if (numOfReceived == numOfActions) { + if (numOfExecuted == numOfActions) { if (errCode == 0) { mDebug("trans:%d, all %d actions execute successfully", pTrans->id, numOfActions); return 0; @@ -1019,7 +1064,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA return errCode; } } else { - mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfReceived, numOfActions); + mDebug("trans:%d, %d of %d actions executed", pTrans->id, numOfExecuted, numOfActions); return TSDB_CODE_ACTION_IN_PROGRESS; } } @@ -1041,7 +1086,7 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitActions); + int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); if (code != 0) { mError("failed to execute commitActions since %s", terrstr()); } @@ -1063,14 +1108,16 @@ static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); if (code == 0) { pTrans->redoActionPos++; - mDebug("trans:%d, redo action:%d is executed and need sync to other mnodes", pTrans->id, pAction->id); + mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), + pAction->id); // todo sync these infos } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - mDebug("trans:%d, redo action:%d is in progress and wait it finish", pTrans->id, pAction->id); + mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); continueExec = false; } else { - mError("trans:%d, redo action:%d failed to execute since %s", pTrans->id, pAction->id, terrstr()); + mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); continueExec = false; } @@ -1207,8 +1254,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans) { mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); } - mDebug("trans:%d, finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); - + mDebug("trans:%d, execute finished, code:0x%x, failedTimes:%d", pTrans->id, pTrans->code, pTrans->failedTimes); return continueExec; } @@ -1271,15 +1317,15 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) { if (pAction == NULL) continue; if (pAction->msgReceived == 0) { - mInfo("trans:%d, action:%d set processed for kill msg received", pTrans->id, i); + mInfo("trans:%d, %s:%d set processed for kill msg received", pTrans->id, mndTransStr(pAction->stage), i); pAction->msgSent = 1; pAction->msgReceived = 1; pAction->errCode = 0; } if (pAction->errCode != 0) { - mInfo("trans:%d, action:%d set processed for kill msg received, errCode from %s to success", pTrans->id, i, - tstrerror(pAction->errCode)); + mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id, + mndTransStr(pAction->stage), i, tstrerror(pAction->errCode)); pAction->msgSent = 1; pAction->msgReceived = 1; pAction->errCode = 0; diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index cc6364c457..83d00c86e3 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -77,7 +77,7 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw); + mDebug("user:%s, will be created when deploying, raw:%p", userObj.user, pRaw); #if 0 return sdbWrite(pMnode->pSdb, pRaw); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index e05b38a7c0..161fc5379c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -501,7 +501,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups) { *ppVgroups = pVgroups; code = 0; - mInfo("db:%s, %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); + mInfo("db:%s, total %d vgroups is alloced, replica:%d", pDb->name, pDb->cfg.numOfVgroups, pDb->cfg.replications); _OVER: if (code != 0) taosMemoryFree(pVgroups); @@ -539,7 +539,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) { pVgid->role = TAOS_SYNC_STATE_FOLLOWER; pDnode->numOfVnodes++; - mInfo("db:%s, vgId:%d, vn:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); + mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId); maxPos++; if (maxPos == 3) return 0; } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index c66b47a24b..1fd0260d0d 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -168,6 +168,7 @@ typedef struct SSdb { char *currDir; char *tmpDir; int64_t lastCommitVer; + int64_t lastCommitTerm; int64_t curVer; int64_t curTerm; int64_t tableVer[SDB_MAX]; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 485b729deb..0526ea5c2d 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -55,6 +55,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { pSdb->curVer = -1; pSdb->curTerm = -1; pSdb->lastCommitVer = -1; + pSdb->lastCommitTerm = -1; pSdb->pMnode = pOption->pMnode; taosThreadMutexInit(&pSdb->filelock, NULL); mDebug("sdb init successfully"); diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 834e7a00c8..83135491a9 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -70,6 +70,7 @@ static void sdbResetData(SSdb *pSdb) { pSdb->curVer = -1; pSdb->curTerm = -1; pSdb->lastCommitVer = -1; + pSdb->lastCommitTerm = -1; mDebug("sdb reset successfully"); } @@ -211,12 +212,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { char file[PATH_MAX] = {0}; snprintf(file, sizeof(file), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to read file:%s", file); + mDebug("start to read sdb file:%s", file); SSdbRaw *pRaw = taosMemoryMalloc(WAL_MAX_SIZE + 100); if (pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - mError("failed read file since %s", terrstr()); + mError("failed read sdb file since %s", terrstr()); return -1; } @@ -224,12 +225,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (pFile == NULL) { taosMemoryFree(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, terrstr()); + mError("failed to read sdb file:%s since %s", file, terrstr()); return 0; } if (sdbReadFileHead(pSdb, pFile) != 0) { - mError("failed to read file:%s head since %s", file, terrstr()); + mError("failed to read sdb file:%s head since %s", file, terrstr()); taosMemoryFree(pRaw); taosCloseFile(&pFile); return -1; @@ -245,13 +246,13 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { if (ret < 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } @@ -259,34 +260,36 @@ static int32_t sdbReadFileImp(SSdb *pSdb) { ret = taosReadFile(pFile, pRaw->pData, readLen); if (ret < 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } if (ret != readLen) { code = TSDB_CODE_FILE_CORRUPTED; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } int32_t totalLen = sizeof(SSdbRaw) + pRaw->dataLen + sizeof(int32_t); if ((!taosCheckChecksumWhole((const uint8_t *)pRaw, totalLen)) != 0) { code = TSDB_CODE_CHECKSUM_ERROR; - mError("failed to read file:%s since %s", file, tstrerror(code)); + mError("failed to read sdb file:%s since %s", file, tstrerror(code)); break; } code = sdbWriteWithoutFree(pSdb, pRaw); if (code != 0) { - mError("failed to read file:%s since %s", file, terrstr()); + mError("failed to read sdb file:%s since %s", file, terrstr()); goto _OVER; } } code = 0; pSdb->lastCommitVer = pSdb->curVer; + pSdb->lastCommitTerm = pSdb->curTerm; memcpy(pSdb->tableVer, tableVer, sizeof(tableVer)); - mDebug("read file:%s successfully, ver:%" PRId64, file, pSdb->lastCommitVer); + mDebug("read sdb file:%s successfully, ver:%" PRId64 " term:%" PRId64, file, pSdb->lastCommitVer, + pSdb->lastCommitTerm); _OVER: taosCloseFile(&pFile); @@ -302,7 +305,7 @@ int32_t sdbReadFile(SSdb *pSdb) { sdbResetData(pSdb); int32_t code = sdbReadFileImp(pSdb); if (code != 0) { - mError("failed to read sdb since %s", terrstr()); + mError("failed to read sdb file since %s", terrstr()); sdbResetData(pSdb); } @@ -318,18 +321,19 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { char curfile[PATH_MAX] = {0}; snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP); - mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer, - pSdb->curTerm, pSdb->lastCommitVer); + mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64 + " file:%s", + pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile); TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); if (pFile == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s for write since %s", tmpfile, terrstr()); + mError("failed to open sdb file:%s for write since %s", tmpfile, terrstr()); return -1; } if (sdbWriteFileHead(pSdb, pFile) != 0) { - mError("failed to write file:%s head since %s", tmpfile, terrstr()); + mError("failed to write sdb file:%s head since %s", tmpfile, terrstr()); taosCloseFile(&pFile); return -1; } @@ -338,7 +342,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { SdbEncodeFp encodeFp = pSdb->encodeFps[i]; if (encodeFp == NULL) continue; - mTrace("write %s to file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); + mTrace("write %s to sdb file, total %d rows", sdbTableName(i), sdbGetSize(pSdb, i)); SHashObj *hash = pSdb->hashObjs[i]; TdThreadRwlock *pLock = &pSdb->locks[i]; @@ -394,7 +398,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosFsyncFile(pFile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to sync file:%s since %s", tmpfile, tstrerror(code)); + mError("failed to sync sdb file:%s since %s", tmpfile, tstrerror(code)); } } @@ -404,15 +408,17 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { code = taosRenameFile(tmpfile, curfile); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); - mError("failed to write file:%s since %s", curfile, tstrerror(code)); + mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } } if (code != 0) { - mError("failed to write file:%s since %s", curfile, tstrerror(code)); + mError("failed to write sdb file:%s since %s", curfile, tstrerror(code)); } else { pSdb->lastCommitVer = pSdb->curVer; - mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm); + pSdb->lastCommitTerm = pSdb->curTerm; + mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer, + pSdb->lastCommitTerm, curfile); } terrno = code; @@ -427,7 +433,7 @@ int32_t sdbWriteFile(SSdb *pSdb) { taosThreadMutexLock(&pSdb->filelock); int32_t code = sdbWriteFileImp(pSdb); if (code != 0) { - mError("failed to write sdb since %s", terrstr()); + mError("failed to write sdb file since %s", terrstr()); } taosThreadMutexUnlock(&pSdb->filelock); return code; @@ -493,7 +499,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { if (taosCopyFile(datafile, pIter->name) < 0) { taosThreadMutexUnlock(&pSdb->filelock); terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to copy file %s to %s since %s", datafile, pIter->name, terrstr()); + mError("failed to copy sdb file %s to %s since %s", datafile, pIter->name, terrstr()); sdbCloseIter(pIter); return -1; } @@ -502,7 +508,7 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) { pIter->file = taosOpenFile(pIter->name, TD_FILE_READ); if (pIter->file == NULL) { terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to open file:%s since %s", pIter->name, terrstr()); + mError("failed to open sdb file:%s since %s", pIter->name, terrstr()); sdbCloseIter(pIter); return -1; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7201820854..655dcbc853 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -79,7 +79,11 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { if (taskHandle) { code = qExecTask(taskHandle, &pRes, &useconds); if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + if (code != TSDB_CODE_OPS_NOT_SUPPORT) { + QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + } else { + QW_TASK_DLOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); + } QW_ERR_RET(code); } } From 18ca6d93e7dc0af7dd530b8db8ce6f2e50eaadbb Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 31 May 2022 16:55:16 +0800 Subject: [PATCH 3/3] refactor: make trans support multi steps --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndMnode.c | 4 +- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/mnode/impl/src/mndTrans.c | 114 ++++++++++++++----------- 5 files changed, 70 insertions(+), 54 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index fd0f54c66b..9a60ad860f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -129,7 +129,7 @@ typedef enum { typedef enum { TRN_EXEC_PARALLEL = 0, - TRN_EXEC_ONE_BY_ONE = 1, + TRN_EXEC_NO_PARALLEL = 1, } ETrnExecType; typedef enum { diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a7e1f7cd02..ba6f5faf1e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -62,7 +62,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen); void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb); -void mndTransSetExecOneByOne(STrans *pTrans); +void mndTransSetNoParallel(STrans *pTrans); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransProcessRsp(SRpcMsg *pRsp); diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 8c5ea840af..5b8ba6deaa 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -367,7 +367,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER; @@ -539,7 +539,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SRpcMsg *pReq, SMnodeObj *pObj) { if (pTrans == NULL) goto _OVER; mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 7b5d1b6c32..0493b00d33 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -507,7 +507,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name); mndTransSetDbInfo(pTrans, pDb); - mndTransSetExecOneByOne(pTrans); + mndTransSetNoParallel(pTrans); if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c5a1e0ba5a..ad6388c585 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -120,8 +120,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER) SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) - SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions); @@ -261,8 +261,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->type = type; pTrans->parallel = parallel; SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) - SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER) @@ -567,6 +567,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; + pOld->redoActionPos = pNew->redoActionPos; if (pOld->stage == TRN_STAGE_COMMIT) { pOld->stage = TRN_STAGE_COMMIT_ACTION; @@ -694,11 +695,10 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void * } void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) { - pTrans->dbUid = pDb->uid; memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN); } -void mndTransSetExecOneByOne(STrans *pTrans) { pTrans->parallel = TRN_EXEC_ONE_BY_ONE; } +void mndTransSetNoParallel(STrans *pTrans) { pTrans->parallel = TRN_EXEC_NO_PARALLEL; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { SSdbRaw *pRaw = mndTransActionEncode(pTrans); @@ -708,7 +708,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("trans:%d, sync to other nodes", pTrans->id); + mDebug("trans:%d, sync to other mnodes", pTrans->id); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); @@ -761,7 +761,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); conflict = true; } else if (mndIsDbTrans(pTrans) || mndIsStbTrans(pTrans)) { - if (pNewTrans->dbUid == pTrans->dbUid) { + if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); conflict = true; } @@ -774,7 +774,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNewTrans) { mError("trans:%d, can't execute since trans:%d in progress", pNewTrans->id, pTrans->id); conflict = true; } else if (mndIsDbTrans(pTrans)) { - if (pNewTrans->dbUid == pTrans->dbUid) { + if (strcmp(pNewTrans->dbname, pTrans->dbname) == 0) { mError("trans:%d, can't execute since trans:%d in progress db:%s", pNewTrans->id, pTrans->id, pTrans->dbname); conflict = true; } @@ -856,7 +856,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { } if (pTrans->policy == TRN_POLICY_ROLLBACK) { - if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { + if (pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) { if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR; sendRsp = true; } @@ -876,12 +876,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), pTrans->rpcInfo.ahandle); - SRpcMsg rspMsg = { - .code = code, - .pCont = rpcCont, - .contLen = pTrans->rpcRspLen, - .info = pTrans->rpcInfo, - }; + SRpcMsg rspMsg = {.code = code, .pCont = rpcCont, .contLen = pTrans->rpcRspLen, .info = pTrans->rpcInfo}; tmsgSendRsp(&rspMsg); pTrans->rpcInfo.handle = NULL; pTrans->rpcRsp = NULL; @@ -944,7 +939,6 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived && (pAction->errCode == 0 || pAction->errCode == pAction->acceptableCode)) continue; @@ -1017,16 +1011,14 @@ static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAc } } -static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) { +static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { int32_t numOfActions = taosArrayGetSize(pArray); int32_t code = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { - break; - } + if (code != 0) break; } return code; @@ -1036,7 +1028,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA int32_t numOfActions = taosArrayGetSize(pArray); if (numOfActions == 0) return 0; - if (mndTransSendActionMsg(pMnode, pTrans, pArray) != 0) { + if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { return -1; } @@ -1044,8 +1036,7 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); - if (pAction == NULL) continue; - if ((pAction->msgSent && pAction->msgReceived) || pAction->rawWritten) { + if (pAction->msgReceived || pAction->rawWritten) { numOfExecuted++; if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { errCode = pAction->errCode; @@ -1087,12 +1078,61 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); - if (code != 0) { + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to execute commitActions since %s", terrstr()); } return code; } +static int32_t mndTransExecuteRedoActionsNoParallel(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; + int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); + if (numOfActions == 0) return code; + if (pTrans->redoActionPos >= numOfActions) return code; + + for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + + code = mndTransExecSingleAction(pMnode, pTrans, pAction); + if (code == 0) { + if (pAction->msgSent) { + if (pAction->msgReceived) { + if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { + code = pAction->errCode; + } + } else { + code = TSDB_CODE_ACTION_IN_PROGRESS; + } + } + if (pAction->rawWritten) { + if (pAction->errCode != 0 && pAction->errCode != pAction->acceptableCode) { + code = pAction->errCode; + } + } + } + + if (code == 0) { + pTrans->redoActionPos++; + mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), + pAction->id); + code = mndTransSync(pMnode, pTrans); + if (code != 0) { + mError("trans:%d, failed to sync redoActionPos since %s", pTrans->id, terrstr()); + break; + } + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); + break; + } else { + mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, + terrstr()); + break; + } + } + + return code; +} + static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; pTrans->stage = TRN_STAGE_REDO_ACTION; @@ -1100,36 +1140,12 @@ static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { return continueExec; } -static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) { - bool continueExec = true; - if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec; - - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); - int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction); - if (code == 0) { - pTrans->redoActionPos++; - mDebug("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), - pAction->id); - - // todo sync these infos - } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { - mDebug("trans:%d, %s:%d is in progress and wait it finish", pTrans->id, mndTransStr(pAction->stage), pAction->id); - continueExec = false; - } else { - mError("trans:%d, %s:%d failed to execute since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, - terrstr()); - continueExec = false; - } - - return continueExec; -} - static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; int32_t code = 0; - if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) { - code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans); + if (pTrans->parallel == TRN_EXEC_NO_PARALLEL) { + code = mndTransExecuteRedoActionsNoParallel(pMnode, pTrans); } else { code = mndTransExecuteRedoActions(pMnode, pTrans); }