fix: only write three sages prepare, rollback, and commit of trans to sdb
This commit is contained in:
parent
8fda7b64dc
commit
1dd08f30e6
|
@ -57,11 +57,11 @@ typedef enum {
|
|||
TRN_STAGE_PREPARE = 0,
|
||||
TRN_STAGE_REDO_LOG = 1,
|
||||
TRN_STAGE_REDO_ACTION = 2,
|
||||
TRN_STAGE_COMMIT = 3,
|
||||
TRN_STAGE_COMMIT_LOG = 4,
|
||||
TRN_STAGE_UNDO_ACTION = 5,
|
||||
TRN_STAGE_UNDO_LOG = 6,
|
||||
TRN_STAGE_ROLLBACK = 7,
|
||||
TRN_STAGE_ROLLBACK = 3,
|
||||
TRN_STAGE_UNDO_ACTION = 4,
|
||||
TRN_STAGE_UNDO_LOG = 5,
|
||||
TRN_STAGE_COMMIT = 6,
|
||||
TRN_STAGE_COMMIT_LOG = 7,
|
||||
TRN_STAGE_FINISHED = 8
|
||||
} ETrnStage;
|
||||
|
||||
|
|
|
@ -123,71 +123,82 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
|
|||
}
|
||||
|
||||
int32_t dataPos = 0;
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->id, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->stage, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->type, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, redoLogNum, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, undoLogNum, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, commitLogNum, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, redoActionNum, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, undoActionNum, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->id, _OVER)
|
||||
|
||||
ETrnStage stage = pTrans->stage;
|
||||
if (stage == TRN_STAGE_REDO_LOG || stage == TRN_STAGE_REDO_ACTION) {
|
||||
stage = TRN_STAGE_PREPARE;
|
||||
} else if (stage == TRN_STAGE_UNDO_ACTION || stage == TRN_STAGE_UNDO_LOG) {
|
||||
stage = TRN_STAGE_ROLLBACK;
|
||||
} else if (stage == TRN_STAGE_COMMIT_LOG || stage == TRN_STAGE_FINISHED) {
|
||||
stage = TRN_STAGE_COMMIT;
|
||||
} else {
|
||||
}
|
||||
|
||||
SDB_SET_INT16(pRaw, dataPos, stage, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->policy, _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pTrans->type, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
|
||||
SDB_SET_INT64(pRaw, dataPos, pTrans->dbUid, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_DB_FNAME_LEN, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, redoLogNum, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, undoLogNum, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, commitLogNum, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, redoActionNum, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER)
|
||||
|
||||
for (int32_t i = 0; i < redoLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
|
||||
int32_t len = sdbGetRawTotalSize(pTmp);
|
||||
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < undoLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
|
||||
int32_t len = sdbGetRawTotalSize(pTmp);
|
||||
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < commitLogNum; ++i) {
|
||||
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
|
||||
int32_t len = sdbGetRawTotalSize(pTmp);
|
||||
SDB_SET_INT32(pRaw, dataPos, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, len, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len, _OVER)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < redoActionNum; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pTrans->redoActions, i);
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER)
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < undoActionNum; ++i) {
|
||||
STransAction *pAction = taosArrayGet(pTrans->undoActions, i);
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER)
|
||||
SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->acceptableCode, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen, _OVER)
|
||||
}
|
||||
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->startFunc, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->stopFunc, _OVER)
|
||||
SDB_SET_INT32(pRaw, dataPos, pTrans->paramLen, _OVER)
|
||||
if (pTrans->param != NULL) {
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, TRANS_ENCODE_OVER)
|
||||
SDB_SET_BINARY(pRaw, dataPos, pTrans->param, pTrans->paramLen, _OVER)
|
||||
}
|
||||
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, TRANS_ENCODE_OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, TRANS_ENCODE_OVER)
|
||||
SDB_SET_RESERVE(pRaw, dataPos, TRANS_RESERVE_SIZE, _OVER)
|
||||
SDB_SET_DATALEN(pRaw, dataPos, _OVER)
|
||||
|
||||
terrno = 0;
|
||||
|
||||
TRANS_ENCODE_OVER:
|
||||
_OVER:
|
||||
if (terrno != 0) {
|
||||
mError("trans:%d, failed to encode to raw:%p len:%d since %s", pTrans->id, pRaw, dataPos, terrstr());
|
||||
sdbFreeRaw(pRaw);
|
||||
|
@ -229,14 +240,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
|||
|
||||
SDB_GET_INT32(pRaw, dataPos, &pTrans->id, _OVER)
|
||||
|
||||
int16_t type = 0;
|
||||
int16_t policy = 0;
|
||||
int16_t stage = 0;
|
||||
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
||||
int16_t policy = 0;
|
||||
int16_t type = 0;
|
||||
SDB_GET_INT16(pRaw, dataPos, &stage, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &policy, _OVER)
|
||||
SDB_GET_INT16(pRaw, dataPos, &type, _OVER)
|
||||
pTrans->policy = policy;
|
||||
pTrans->stage = stage;
|
||||
pTrans->policy = policy;
|
||||
pTrans->type = type;
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
|
||||
SDB_GET_INT64(pRaw, dataPos, &pTrans->dbUid, _OVER)
|
||||
|
@ -453,7 +464,6 @@ static TransCbFp mndTransGetCbFp(ETrnFuncType ftype) {
|
|||
}
|
||||
|
||||
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
|
||||
// pTrans->stage = TRN_STAGE_PREPARE;
|
||||
mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
|
||||
|
||||
if (pTrans->startFunc > 0) {
|
||||
|
@ -516,8 +526,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
|
|||
}
|
||||
|
||||
static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
|
||||
STrans *pTrans = sdbAcquire(pMnode->pSdb, SDB_TRANS, &transId);
|
||||
if (pTrans == NULL) {
|
||||
terrno = TSDB_CODE_MND_TRANS_NOT_EXIST;
|
||||
}
|
||||
|
@ -558,7 +567,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
|
|||
return NULL;
|
||||
}
|
||||
|
||||
mDebug("trans:%d, local var is created, data:%p", pTrans->id, pTrans);
|
||||
mDebug("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
|
||||
return pTrans;
|
||||
}
|
||||
|
||||
|
@ -585,7 +594,7 @@ static void mndTransDropActions(SArray *pArray) {
|
|||
void mndTransDrop(STrans *pTrans) {
|
||||
if (pTrans != NULL) {
|
||||
mndTransDropData(pTrans);
|
||||
mDebug("trans:%d, local var is freed, data:%p", pTrans->id, pTrans);
|
||||
mDebug("trans:%d, local object is freed, data:%p", pTrans->id, pTrans);
|
||||
taosMemoryFreeClear(pTrans);
|
||||
}
|
||||
}
|
||||
|
@ -674,19 +683,19 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
|
|||
}
|
||||
|
||||
static bool mndIsBasicTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_BASIC_SCOPE && pTrans->stage < TRN_TYPE_BASIC_SCOPE_END;
|
||||
return pTrans->type > TRN_TYPE_BASIC_SCOPE && pTrans->type < TRN_TYPE_BASIC_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsGlobalTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_GLOBAL_SCOPE && pTrans->stage < TRN_TYPE_GLOBAL_SCOPE_END;
|
||||
return pTrans->type > TRN_TYPE_GLOBAL_SCOPE && pTrans->type < TRN_TYPE_GLOBAL_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsDbTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_DB_SCOPE && pTrans->stage < TRN_TYPE_DB_SCOPE_END;
|
||||
return pTrans->type > TRN_TYPE_DB_SCOPE && pTrans->type < TRN_TYPE_DB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndIsStbTrans(STrans *pTrans) {
|
||||
return pTrans->stage > TRN_TYPE_STB_SCOPE && pTrans->stage < TRN_TYPE_STB_SCOPE_END;
|
||||
return pTrans->type > TRN_TYPE_STB_SCOPE && pTrans->type < TRN_TYPE_STB_SCOPE_END;
|
||||
}
|
||||
|
||||
static bool mndCheckTransCanParallel(SMnode *pMnode, STrans *pNewTrans) {
|
||||
|
|
Loading…
Reference in New Issue