Merge pull request #10295 from taosdata/feature/privilege

add trans stage type
This commit is contained in:
Shengliang Guan 2022-02-17 13:45:23 +08:00 committed by GitHub
commit c52276dc80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 48 additions and 20 deletions

View File

@ -51,7 +51,7 @@ static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans);
static void mndTransExecute(SMnode *pMnode, STrans *pTrans); static void mndTransExecute(SMnode *pMnode, STrans *pTrans);
static void mndTransSendRpcRsp(STrans *pTrans); static void mndTransSendRpcRsp(STrans *pTrans);
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg); static int32_t mndProcessTransReq(SMnodeMsg *pMsg);
int32_t mndInitTrans(SMnode *pMnode) { int32_t mndInitTrans(SMnode *pMnode) {
SSdbTable table = {.sdbType = SDB_TRANS, SSdbTable table = {.sdbType = SDB_TRANS,
@ -62,7 +62,7 @@ int32_t mndInitTrans(SMnode *pMnode) {
.updateFp = (SdbUpdateFp)mndTransActionUpdate, .updateFp = (SdbUpdateFp)mndTransActionUpdate,
.deleteFp = (SdbDeleteFp)mndTransActionDelete}; .deleteFp = (SdbDeleteFp)mndTransActionDelete};
mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransMsg); mndSetMsgHandle(pMnode, TDMT_MND_TRANS, mndProcessTransReq);
return sdbSetTable(pMnode->pSdb, table); return sdbSetTable(pMnode->pSdb, table);
} }
@ -80,17 +80,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) {
for (int32_t i = 0; i < redoLogNum; ++i) { for (int32_t i = 0; i < redoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->redoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
} }
for (int32_t i = 0; i < undoLogNum; ++i) { for (int32_t i = 0; i < undoLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->undoLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
} }
for (int32_t i = 0; i < commitLogNum; ++i) { for (int32_t i = 0; i < commitLogNum; ++i) {
SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i); SSdbRaw *pTmp = taosArrayGetP(pTrans->commitLogs, i);
rawDataLen += (sdbGetRawTotalSize(pTmp) + 4); rawDataLen += (sdbGetRawTotalSize(pTmp) + sizeof(int32_t));
} }
for (int32_t i = 0; i < redoActionNum; ++i) { for (int32_t i = 0; i < redoActionNum; ++i) {
@ -296,9 +296,34 @@ TRANS_DECODE_OVER:
return pRow; return pRow;
} }
static const char *mndTransStr(ETrnStage stage) {
switch (stage) {
case TRN_STAGE_PREPARE:
return "prepare";
case TRN_STAGE_REDO_LOG:
return "redoLog";
case TRN_STAGE_REDO_ACTION:
return "redoAction";
case TRN_STAGE_COMMIT:
return "commit";
case TRN_STAGE_COMMIT_LOG:
return "commitLog";
case TRN_STAGE_UNDO_ACTION:
return "undoAction";
case TRN_STAGE_UNDO_LOG:
return "undoLog";
case TRN_STAGE_ROLLBACK:
return "rollback";
case TRN_STAGE_FINISHED:
return "finished";
default:
return "invalid";
}
}
static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) {
pTrans->stage = TRN_STAGE_PREPARE; // pTrans->stage = TRN_STAGE_PREPARE;
mTrace("trans:%d, perform insert action, row:%p", pTrans->id, pTrans); mTrace("trans:%d, perform insert action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
return 0; return 0;
} }
@ -316,21 +341,24 @@ static void mndTransDropData(STrans *pTrans) {
} }
static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) {
mTrace("trans:%d, perform delete action, row:%p", pTrans->id, pTrans); mTrace("trans:%d, perform delete action, row:%p stage:%s", pTrans->id, pTrans, mndTransStr(pTrans->stage));
mndTransDropData(pTrans); mndTransDropData(pTrans);
return 0; return 0;
} }
static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
if (pNew->stage == TRN_STAGE_COMMIT) pNew->stage = TRN_STAGE_COMMIT_LOG; if (pNew->stage == TRN_STAGE_COMMIT) {
pNew->stage = TRN_STAGE_COMMIT_LOG;
mTrace("trans:%d, stage from %s to %s", pNew->id, mndTransStr(TRN_STAGE_COMMIT), mndTransStr(TRN_STAGE_COMMIT_LOG));
}
mTrace("trans:%d, perform update action, old row:%p stage:%d, new row:%p stage:%d", pOld->id, pOld, pOld->stage, pNew, mTrace("trans:%d, perform update action, old row:%p stage:%s, new row:%p stage:%s", pOld->id, pOld,
pNew->stage); mndTransStr(pOld->stage), pNew, mndTransStr(pNew->stage));
pOld->stage = pNew->stage; pOld->stage = pNew->stage;
return 0; return 0;
} }
STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { static STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId); STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &transId);
if (pTrans == NULL) { if (pTrans == NULL) {
@ -339,7 +367,7 @@ STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) {
return pTrans; return pTrans;
} }
void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { static void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pTrans); sdbRelease(pSdb, pTrans);
} }
@ -375,8 +403,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, const SRpcMsg *pReq) {
} }
static void mndTransDropLogs(SArray *pArray) { static void mndTransDropLogs(SArray *pArray) {
if (pArray == NULL) return; int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < pArray->size; ++i) { for (int32_t i = 0; i < size; ++i) {
SSdbRaw *pRaw = taosArrayGetP(pArray, i); SSdbRaw *pRaw = taosArrayGetP(pArray, i);
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
} }
@ -385,10 +413,10 @@ static void mndTransDropLogs(SArray *pArray) {
} }
static void mndTransDropActions(SArray *pArray) { static void mndTransDropActions(SArray *pArray) {
if (pArray == NULL) return; int32_t size = taosArrayGetSize(pArray);
for (int32_t i = 0; i < pArray->size; ++i) { for (int32_t i = 0; i < size; ++i) {
STransAction *pAction = taosArrayGet(pArray, i); STransAction *pAction = taosArrayGet(pArray, i);
free(pAction->pCont); tfree(pAction->pCont);
} }
taosArrayDestroy(pArray); taosArrayDestroy(pArray);
@ -941,8 +969,8 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) {
mndTransSendRpcRsp(pTrans); mndTransSendRpcRsp(pTrans);
} }
static int32_t mndProcessTransMsg(SMnodeMsg *pMsg) { static int32_t mndProcessTransReq(SMnodeMsg *pReq) {
mndTransPullup(pMsg->pMnode); mndTransPullup(pReq->pMnode);
return 0; return 0;
} }