minor changes

This commit is contained in:
Shengliang Guan 2022-02-18 11:47:38 +08:00
parent 0f04c426d1
commit 314116ae11
3 changed files with 30 additions and 20 deletions

View File

@ -128,6 +128,7 @@ typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
ETrnType transType;
int32_t code;
int32_t failedTimes;
void* rpcHandle;
@ -141,7 +142,6 @@ typedef struct {
SArray* undoActions;
int64_t createdTime;
int64_t lastExecTime;
int32_t transType;
uint64_t dbUid;
char dbname[TSDB_DB_FNAME_LEN];
char lastError[TSDB_TRANS_ERROR_LEN];

View File

@ -36,7 +36,7 @@ typedef struct {
int32_t mndInitTrans(SMnode *pMnode);
void mndCleanupTrans(SMnode *pMnode);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq);
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq);
void mndTransDrop(STrans *pTrans);
int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw);
@ -44,6 +44,7 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw);
int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
void mndTransProcessRsp(SMnodeMsg *pRsp);

View File

@ -124,10 +124,10 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
SDB_SET_INT8(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
SDB_SET_INT16(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->transType, TRANS_ENCODE_OVER)
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER)
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
@ -220,23 +220,19 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
pTrans = sdbGetRowObj(pRow);
if (pTrans == NULL) goto TRANS_DECODE_OVER;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));
pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction));
if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, TRANS_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->policy, TRANS_DECODE_OVER)
SDB_GET_INT8(pRaw, dataPos, (int8_t *)&pTrans->stage, TRANS_DECODE_OVER)
int16_t type = 0;
int16_t policy = 0;
int16_t stage = 0;
SDB_GET_INT16(pRaw, dataPos, &policy, TRANS_DECODE_OVER)
SDB_GET_INT16(pRaw, dataPos, &stage, TRANS_DECODE_OVER)
SDB_GET_INT16(pRaw, dataPos, &type, TRANS_DECODE_OVER)
pTrans->policy = policy;
pTrans->stage = stage;
pTrans->transType = type;
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->transType, TRANS_DECODE_OVER)
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, TRANS_DECODE_OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &redoLogNum, TRANS_DECODE_OVER)
@ -245,6 +241,18 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT32(pRaw, dataPos, &redoActionNum, TRANS_DECODE_OVER)
SDB_GET_INT32(pRaw, dataPos, &undoActionNum, TRANS_DECODE_OVER)
pTrans->redoLogs = taosArrayInit(redoLogNum, sizeof(void *));
pTrans->undoLogs = taosArrayInit(undoLogNum, sizeof(void *));
pTrans->commitLogs = taosArrayInit(commitLogNum, sizeof(void *));
pTrans->redoActions = taosArrayInit(redoActionNum, sizeof(STransAction));
pTrans->undoActions = taosArrayInit(undoActionNum, sizeof(STransAction));
if (pTrans->redoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->commitLogs == NULL) goto TRANS_DECODE_OVER;
if (pTrans->redoActions == NULL) goto TRANS_DECODE_OVER;
if (pTrans->undoActions == NULL) goto TRANS_DECODE_OVER;
for (int32_t i = 0; i < redoLogNum; ++i) {
SDB_GET_INT32(pRaw, dataPos, &dataLen, TRANS_DECODE_OVER)
pData = malloc(dataLen);
@ -402,7 +410,7 @@ static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
sdbRelease(pSdb, pTrans);
}
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const SRpcMsg *pReq) {
STrans *pTrans = calloc(1, sizeof(STrans));
if (pTrans == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -413,6 +421,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
pTrans->stage = TRN_STAGE_PREPARE;
pTrans->policy = policy;
pTrans->transType = type;
pTrans->rpcHandle = pReq->handle;
pTrans->rpcAHandle = pReq->ahandle;
pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *));