refactor: make trans support multi steps

This commit is contained in:
Shengliang Guan 2022-05-30 20:38:46 +08:00
parent 536ea7b9da
commit 53def5b77b
6 changed files with 296 additions and 349 deletions

View File

@ -254,6 +254,7 @@ typedef enum ELogicConditionType {
#define TSDB_TRANS_STAGE_LEN 12
#define TSDB_TRANS_TYPE_LEN 16
#define TSDB_TRANS_ERROR_LEN 64
#define TSDB_TRANS_DESC_LEN 128
#define TSDB_STEP_NAME_LEN 32
#define TSDB_STEP_DESC_LEN 128

View File

@ -60,14 +60,12 @@ typedef enum {
typedef enum {
TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_LOG = 1,
TRN_STAGE_REDO_ACTION = 2,
TRN_STAGE_ROLLBACK = 3,
TRN_STAGE_UNDO_ACTION = 4,
TRN_STAGE_UNDO_LOG = 5,
TRN_STAGE_COMMIT = 6,
TRN_STAGE_COMMIT_LOG = 7,
TRN_STAGE_FINISHED = 8
TRN_STAGE_REDO_ACTION = 1,
TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_COMMIT = 4,
TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_FINISHED = 6
} ETrnStage;
typedef enum {
@ -168,16 +166,16 @@ typedef struct {
SRpcHandleInfo rpcInfo;
void* rpcRsp;
int32_t rpcRspLen;
SArray* redoLogs;
SArray* undoLogs;
SArray* commitLogs;
int32_t redoActionPos;
SArray* redoActions;
SArray* undoActions;
SArray* commitActions;
int64_t createdTime;
int64_t lastExecTime;
int64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];
char desc[TSDB_TRANS_DESC_LEN];
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;

View File

@ -26,31 +26,23 @@ typedef enum {
TRANS_START_FUNC_TEST = 1,
TRANS_STOP_FUNC_TEST = 2,
TRANS_START_FUNC_MQ_REB = 3,
TRANS_STOP_FUNC_TEST_MQ_REB = 4,
TRANS_STOP_FUNC_MQ_REB = 4,
} ETrnFunc;
typedef struct {
SEpSet epSet;
tmsg_t msgType;
int8_t msgSent;
int8_t msgReceived;
int32_t errCode;
int32_t acceptableCode;
int32_t contLen;
void *pCont;
} STransAction;
typedef struct {
int32_t id;
tmsg_t msgType;
int8_t msgSent;
int8_t msgReceived;
int8_t isRaw;
int8_t rawWritten;
SSdbRaw *pRaw;
} STransLog;
typedef struct {
ETrnStep stepType;
STransAction redoAction;
STransAction undoAction;
STransLog redoLog;
STransLog undoLog;
} STransStep;
SEpSet epSet;
int32_t errCode;
int32_t acceptableCode;
int32_t contLen;
void *pCont;
} STransAction;
typedef void (*TransCbFp)(SMnode *pMnode, void *param, int32_t paramLen);

View File

@ -493,7 +493,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
// 4. TODO commit log: modification log
// 5. set cb
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_TEST_MQ_REB, NULL, 0);
mndTransSetCb(pTrans, TRANS_START_FUNC_MQ_REB, TRANS_STOP_FUNC_MQ_REB, NULL, 0);
// 6. execution
if (mndTransPrepare(pMnode, pTrans) != 0) goto REB_FAIL;

View File

@ -43,13 +43,13 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans);
static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
@ -83,40 +83,30 @@ int32_t mndInitTrans(SMnode *pMnode) {
void mndCleanupTrans(SMnode *pMnode) {}
static int32_t mndTransGetActionsSize(SArray *pArray) {
int32_t actionNum = taosArrayGetSize(pArray);
int32_t rawDataLen = 0;
for (int32_t i = 0; i < actionNum; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
if (pAction->isRaw) {
rawDataLen += (sdbGetRawTotalSize(pAction->pRaw) + sizeof(int32_t));
} else {
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
rawDataLen += sizeof(pAction->isRaw);
}
return rawDataLen;
}
static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
int32_t rawDataLen = sizeof(STrans) + TRANS_RESERVE_SIZE;
int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs);
int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs);
int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
}
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
rawDataLen += (sizeof(STransAction) + pAction->contLen);
}
rawDataLen += mndTransGetActionsSize(pTrans->redoActions);
rawDataLen += mndTransGetActionsSize(pTrans->undoActions);
rawDataLen += mndTransGetActionsSize(pTrans->commitActions);
SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TRANS_VER_NUMBER, rawDataLen);
if (pRaw == NULL) {
@ -126,67 +116,67 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
ETrnStage stage = pTrans->stage;
if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) {
stage = TRN_STAGE_PREPARE;
} else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) {
stage = TRN_STAGE_ROLLBACK;
} else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) {
stage = TRN_STAGE_COMMIT;
} else {
}
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->parallel, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER)
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
int32_t undoActionNum = taosArrayGetSize(pTrans->undoActions);
int32_t commitActionNum = taosArrayGetSize(pTrans->commitActions);
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
int32_t len = sdbGetRawTotalSize(pTmp);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
}
SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER)
for (int32_t i = 0; i < redoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER)
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
STransAction *pAction = taosArrayGet(pTrans->commitActions, i);
SDB_SET_INT8(pRaw, dataPos, pAction->isRaw, _OVER)
if (pAction->isRaw) {
int32_t len = sdbGetRawTotalSize(pAction->pRaw);
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER)
} else {
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
}
}
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
@ -220,11 +210,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
char *pData = NULL;
int32_t dataLen = 0;
int8_t sver = 0;
int32_t redoLogNum = 0;
int32_t undoLogNum = 0;
int32_t commitLogNum = 0;
int32_t redoActionNum = 0;
int32_t undoActionNum = 0;
int32_t commitActionNum = 0;
int32_t dataPos = 0;
STransAction action = {0};
@ -258,76 +246,85 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitLogNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, _OVER)
SDB_GET_INT32(pRaw, dataPos, &commitActionNum, _OVER)
pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *));
pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *));
pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(commitActionNum, sizeof(STransAction));
if (pTrans->redoLogs == NULL) goto _OVER;
if (pTrans->undoLogs == NULL) goto _OVER;
if (pTrans->commitLogs == NULL) goto _OVER;
if (pTrans->redoActions == NULL) goto _OVER;
if (pTrans->undoActions == NULL) goto _OVER;
for (int32_t i = 0; i < redoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->redoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < undoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->undoLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
for (int32_t i = 0; i < commitLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->commitLogs, &pData) == NULL) goto _OVER;
pData = NULL;
}
if (pTrans->commitActions == NULL) goto _OVER;
for (int32_t i = 0; i < redoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
if (action.isRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &pData) == NULL) goto _OVER;
pData = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
for (int32_t i = 0; i < undoActionNum; ++i) {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
if (action.isRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &pData) == NULL) goto _OVER;
pData = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
for (int32_t i = 0; i < commitActionNum; ++i) {
SDB_GET_INT8(pRaw, dataPos, &action.isRaw, _OVER)
if (action.isRaw) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
pData = taosMemoryMalloc(dataLen);
if (pData == NULL) goto _OVER;
mTrace("raw:%p, is created", pData);
SDB_GET_BINARY(pRaw, dataPos, pData, dataLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &pData) == NULL) goto _OVER;
pData = NULL;
} else {
SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER);
SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.acceptableCode, _OVER)
SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER)
action.pCont = taosMemoryMalloc(action.contLen);
if (action.pCont == NULL) goto _OVER;
SDB_GET_BINARY(pRaw, dataPos, action.pCont, action.contLen, _OVER);
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
action.pCont = NULL;
}
}
SDB_GET_INT32(pRaw, dataPos, &pTrans->startFunc, _OVER)
@ -360,20 +357,16 @@ static const char *mndTransStr(ETrnStage stage) {
switch (stage) {
case TRN_STAGE_PREPARE:
return "prepare";
case TRN_STAGE_REDO_LOG:
return "redoLog";
case TRN_STAGE_REDO_ACTION:
return "redoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_LOG:
return "commitLog";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_UNDO_LOG:
return "undoLog";
case TRN_STAGE_ROLLBACK:
return "rollback";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_ACTION:
return "commitAction";
case TRN_STAGE_FINISHED:
return "finished";
default:
@ -472,7 +465,7 @@ static TransCbFp mndTransGetCbFp(ETrnFunc ftype) {
return mndTransTestStopFunc;
case TRANS_START_FUNC_MQ_REB:
return mndRebCntInc;
case TRANS_STOP_FUNC_TEST_MQ_REB:
case TRANS_STOP_FUNC_MQ_REB:
return mndRebCntDec;
default:
return NULL;
@ -493,11 +486,9 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
}
static void mndTransDropData(STrans *pTrans) {
mndTransDropLogs(pTrans->redoLogs);
mndTransDropLogs(pTrans->undoLogs);
mndTransDropLogs(pTrans->commitLogs);
mndTransDropActions(pTrans->redoActions);
mndTransDropActions(pTrans->undoActions);
mndTransDropActions(pTrans->commitActions);
if (pTrans->rpcRsp != NULL) {
taosMemoryFree(pTrans->rpcRsp);
pTrans->rpcRsp = NULL;
@ -526,8 +517,8 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) {
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
if (pNew->stage == TRN_STAGE_COMMIT) {
pNew->stage = TRN_STAGE_COMMIT_LOG;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
pNew->stage = TRN_STAGE_COMMIT_ACTION;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_ACTION));
}
if (pNew->stage == TRN_STAGE_ROLLBACK) {
@ -568,14 +559,11 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
pTrans->type = type;
pTrans->createdTime = taosGetTimestampMs();
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->commitActions = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL ||
pTrans->redoActions == NULL || pTrans->undoActions == NULL) {
if (pTrans->redoActions == NULL || pTrans->undoActions == NULL || pTrans->commitActions == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to create transaction since %s", terrstr());
return NULL;
@ -585,21 +573,15 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
return pTrans;
}
static void mndTransDropLogs(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i);
sdbFreeRaw(pRaw);
}
taosArrayDestroy(pArray);
}
static void mndTransDropActions(SArray *pArray) {
int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i);
taosMemoryFreeClear(pAction->pCont);
if (pAction->isRaw) {
sdbFreeRaw(pAction->pRaw);
} else {
taosMemoryFreeClear(pAction->pCont);
}
}
taosArrayDestroy(pArray);
@ -613,27 +595,6 @@ void mndTransDrop(STrans *pTrans) {
}
}
static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) {
if (pArray == NULL || pRaw == NULL) {
terrno = TSDB_CODE_INVALID_PARA;
return -1;
}
void *ptr = taosArrayPush(pArray, &pRaw);
if (ptr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
return 0;
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->redoLogs, pRaw); }
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->undoLogs, pRaw); }
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendLog(pTrans->commitLogs, pRaw); }
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
void *ptr = taosArrayPush(pArray, pAction);
if (ptr == NULL) {
@ -644,6 +605,21 @@ static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction) {
return 0;
}
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->redoActions, &action);
}
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->undoActions, &action);
}
int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) {
STransAction action = {.isRaw = true, .pRaw = pRaw};
return mndTransAppendAction(pTrans->commitActions, &action);
}
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) {
return mndTransAppendAction(pTrans->redoActions, pAction);
}
@ -768,7 +744,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
if (taosArrayGetSize(pTrans->commitLogs) <= 0) {
if (taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;
@ -799,8 +775,6 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
}
static int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) {
if (taosArrayGetSize(pTrans->commitLogs) == 0 && taosArrayGetSize(pTrans->redoActions) == 0) return 0;
mDebug("trans:%d, commit transaction", pTrans->id);
if (mndTransSync(pMnode, pTrans) != 0) {
mError("trans:%d, failed to commit since %s", pTrans->id, terrstr());
@ -829,8 +803,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) {
}
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->stage == TRN_STAGE_UNDO_LOG || pTrans->stage == TRN_STAGE_UNDO_ACTION ||
pTrans->stage == TRN_STAGE_ROLLBACK) {
if (pTrans->stage == pTrans->stage == TRN_STAGE_UNDO_ACTION || pTrans->stage == TRN_STAGE_ROLLBACK) {
if (code == 0) code = TSDB_CODE_MND_TRANS_UNKNOW_ERROR;
sendRsp = true;
}
@ -930,30 +903,6 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) {
return code;
}
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->redoLogs);
if (code != 0) {
mError("failed to execute redoLogs since %s", terrstr());
}
return code;
}
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->undoLogs);
if (code != 0) {
mError("failed to execute undoLogs since %s, return success", terrstr());
}
return 0; // return success in any case
}
static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitLogs);
if (code != 0) {
mError("failed to execute commitLogs since %s", terrstr());
}
return code;
}
static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
@ -962,6 +911,7 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent && pAction->msgReceived && pAction->errCode == 0) continue;
if (pAction->rawWritten && pAction->errCode == 0) continue;
pAction->msgSent = 0;
pAction->msgReceived = 0;
@ -970,56 +920,71 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
}
}
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0) {
mDebug("trans:%d, action:%d write to sdb", pTrans->id, pAction->id);
} else {
mError("trans:%d, action:%d failed to write sdb since %s", pTrans->id, pAction->id, terrstr());
}
return code;
}
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->msgSent) return 0;
if (!pMnode->deploy && !mndIsMaster(pMnode)) return -1;
int64_t signature = pTrans->id;
signature = (signature << 32);
signature += pAction->id;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
int32_t code = tmsgSendReq(&pAction->epSet, &rpcMsg);
if (code == 0) {
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, pAction->id,
pAction->epSet.eps[pAction->epSet.inUse].fqdn, pAction->epSet.eps[pAction->epSet.inUse].port);
} else {
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = (terrno != 0) ? terrno : code;
mError("trans:%d, action:%d not send since %s", pTrans->id, pAction->id, terrstr());
}
return code;
}
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) {
if (pAction->isRaw) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction);
} else {
return mndTransSendSingleMsg(pMnode, pTrans, pAction);
}
}
static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action);
if (pAction == NULL) continue;
if (pAction->msgSent) {
if (pAction->msgReceived) {
continue;
} else {
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
} else {
continue;
}
}
}
int64_t signature = pTrans->id;
signature = (signature << 32);
signature += action;
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
rpcMsg.pCont = rpcMallocCont(pAction->contLen);
if (rpcMsg.pCont == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn,
pAction->epSet.eps[pAction->epSet.inUse].port);
pAction->msgSent = 1;
pAction->msgReceived = 0;
pAction->errCode = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
break;
}
} else {
pAction->msgSent = 0;
pAction->msgReceived = 0;
pAction->errCode = terrno;
mError("trans:%d, action:%d not send since %s", pTrans->id, action, terrstr());
return -1;
code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
break;
}
}
return 0;
return code;
}
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) {
@ -1075,35 +1040,52 @@ static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) {
return code;
}
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) {
int32_t code = mndTransExecuteLogs(pMnode, pTrans->commitActions);
if (code != 0) {
mError("failed to execute commitActions since %s", terrstr());
}
return code;
}
static bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
pTrans->stage = TRN_STAGE_REDO_LOG;
mDebug("trans:%d, stage from prepare to redoLog", pTrans->id);
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from prepare to redoAction", pTrans->id);
return continueExec;
}
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans);
static bool mndTransExecuteRedoActionsOneByOne(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
if (pTrans->redoActionPos >= taosArrayGetSize(pTrans->redoActions)) return continueExec;
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
int32_t code = mndTransExecSingleAction(pMnode, pTrans, pAction);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_REDO_ACTION;
mDebug("trans:%d, stage from redoLog to redoAction", pTrans->id);
pTrans->redoActionPos++;
mDebug("trans:%d, redo action:%d is executed and need sync to other mnodes", pTrans->id, pAction->id);
// todo sync these infos
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, redo action:%d is in progress and wait it finish", pTrans->id, pAction->id);
continueExec = false;
} else {
pTrans->code = terrno;
pTrans->stage = TRN_STAGE_UNDO_LOG;
mError("trans:%d, stage from redoLog to undoLog since %s", pTrans->id, terrstr());
mError("trans:%d, redo action:%d failed to execute since %s", pTrans->id, pAction->id, terrstr());
continueExec = false;
}
return continueExec;
}
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransExecuteRedoActions(pMnode, pTrans);
int32_t code = 0;
if (pTrans->parallel == TRN_EXEC_ONE_BY_ONE) {
code = mndTransExecuteRedoActionsOneByOne(pMnode, pTrans);
} else {
code = mndTransExecuteRedoActions(pMnode, pTrans);
}
if (code == 0) {
pTrans->code = 0;
@ -1135,8 +1117,8 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_COMMIT_LOG;
mDebug("trans:%d, stage from commit to commitLog", pTrans->id);
pTrans->stage = TRN_STAGE_COMMIT_ACTION;
mDebug("trans:%d, stage from commit to commitAction", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
@ -1155,35 +1137,19 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
return continueExec;
}
static bool mndTransPerformCommitLogStage(SMnode *pMnode, STrans *pTrans) {
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans);
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans);
if (code == 0) {
pTrans->code = 0;
pTrans->stage = TRN_STAGE_FINISHED;
mDebug("trans:%d, stage from commitLog to finished", pTrans->id);
mDebug("trans:%d, stage from commitAction to finished", pTrans->id);
continueExec = true;
} else {
pTrans->code = terrno;
pTrans->failedTimes++;
mError("trans:%d, stage keep on commitLog since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
return continueExec;
}
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
bool continueExec = true;
int32_t code = mndTransExecuteUndoLogs(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoLog to rollback", pTrans->id);
continueExec = true;
} else {
mError("trans:%d, stage keep on undoLog since %s", pTrans->id, terrstr());
mError("trans:%d, stage keep on commitAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
@ -1191,14 +1157,12 @@ static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans) {
}
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
if (!pMnode->deploy && !mndIsMaster(pMnode)) return false;
bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans);
if (code == 0) {
pTrans->stage = TRN_STAGE_UNDO_LOG;
mDebug("trans:%d, stage from undoAction to undoLog", pTrans->id);
pTrans->stage = TRN_STAGE_ROLLBACK;
mDebug("trans:%d, stage from undoAction to rollback", pTrans->id);
continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) {
mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
@ -1257,24 +1221,18 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_LOG:
continueExec = mndTransPerformRedoLogStage(pMnode, pTrans);
break;
case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_LOG:
continueExec = mndTransPerformUndoLogStage(pMnode, pTrans);
case TRN_STAGE_COMMIT:
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans);
break;
case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT_LOG:
continueExec = mndTransPerformCommitLogStage(pMnode, pTrans);
break;
case TRN_STAGE_COMMIT:
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
break;
case TRN_STAGE_ROLLBACK:
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
break;

View File

@ -283,9 +283,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
tjsonAddStringToObject(item, "createdTime", i642str(pObj->createdTime));
tjsonAddStringToObject(item, "dbUid", i642str(pObj->dbUid));
tjsonAddStringToObject(item, "dbname", pObj->dbname);
tjsonAddIntegerToObject(item, "redoLogNum", taosArrayGetSize(pObj->redoLogs));
tjsonAddIntegerToObject(item, "undoLogNum", taosArrayGetSize(pObj->undoLogs));
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitLogs));
tjsonAddIntegerToObject(item, "commitLogNum", taosArrayGetSize(pObj->commitActions));
tjsonAddIntegerToObject(item, "redoActionNum", taosArrayGetSize(pObj->redoActions));
tjsonAddIntegerToObject(item, "undoActionNum", taosArrayGetSize(pObj->undoActions));