From 4b92876f8c4ff9b4a5747ab7dadc5f521b3bfa44 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 8 Oct 2022 18:08:26 +0800 Subject: [PATCH] fix: continue execute transactions after taosd restart --- source/dnode/mnode/impl/src/mndTrans.c | 61 +++++++++++++------------- 1 file changed, 31 insertions(+), 30 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 455b71ace9..995fe5171a 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -17,9 +17,9 @@ #include "mndTrans.h" #include "mndConsumer.h" #include "mndDb.h" -#include "mndStb.h" #include "mndPrivilege.h" #include "mndShow.h" +#include "mndStb.h" #include "mndSync.h" #include "mndUser.h" @@ -138,6 +138,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT32(pRaw, dataPos, undoActionNum, _OVER) SDB_SET_INT32(pRaw, dataPos, commitActionNum, _OVER) + int8_t unused = 0; for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); SDB_SET_INT32(pRaw, dataPos, pAction->id, _OVER) @@ -149,14 +150,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } else { @@ -175,14 +176,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } else { @@ -201,14 +202,14 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_INT8(pRaw, dataPos, pAction->reserved, _OVER) if (pAction->actionType == TRANS_ACTION_RAW) { int32_t len = sdbGetRawTotalSize(pAction->pRaw); - SDB_SET_INT8(pRaw, dataPos, pAction->rawWritten, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->rawWritten*/, _OVER) SDB_SET_INT32(pRaw, dataPos, len, _OVER) SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pRaw, len, _OVER) } else if (pAction->actionType == TRANS_ACTION_MSG) { SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet), _OVER) SDB_SET_INT16(pRaw, dataPos, pAction->msgType, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgSent, _OVER) - SDB_SET_INT8(pRaw, dataPos, pAction->msgReceived, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgSent*/, _OVER) + SDB_SET_INT8(pRaw, dataPos, unused /*pAction->msgReceived*/, _OVER) SDB_SET_INT32(pRaw, dataPos, pAction->contLen, _OVER) SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen, _OVER) } else { @@ -305,6 +306,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { if (pTrans->undoActions == NULL) goto _OVER; if (pTrans->commitActions == NULL) goto _OVER; + int8_t unused = 0; for (int32_t i = 0; i < redoActionNum; ++i) { memset(&action, 0, sizeof(action)); SDB_GET_INT32(pRaw, dataPos, &action.id, _OVER) @@ -317,7 +319,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.stage = stage; SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) if (action.actionType == TRANS_ACTION_RAW) { - SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; @@ -328,8 +330,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -353,7 +355,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.stage = stage; SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) if (action.actionType == TRANS_ACTION_RAW) { - SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; @@ -364,8 +366,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -389,7 +391,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { action.stage = stage; SDB_GET_INT8(pRaw, dataPos, &action.reserved, _OVER) if (action.actionType) { - SDB_GET_INT8(pRaw, dataPos, &action.rawWritten, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.rawWritten*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER) action.pRaw = taosMemoryMalloc(dataLen); if (action.pRaw == NULL) goto _OVER; @@ -400,8 +402,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } else if (action.actionType == TRANS_ACTION_MSG) { SDB_GET_BINARY(pRaw, dataPos, (void *)&action.epSet, sizeof(SEpSet), _OVER); SDB_GET_INT16(pRaw, dataPos, &action.msgType, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgSent, _OVER) - SDB_GET_INT8(pRaw, dataPos, &action.msgReceived, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgSent*/, _OVER) + SDB_GET_INT8(pRaw, dataPos, &unused /*&action.msgReceived*/, _OVER) SDB_GET_INT32(pRaw, dataPos, &action.contLen, _OVER) action.pCont = taosMemoryMalloc(action.contLen); if (action.pCont == NULL) goto _OVER; @@ -816,7 +818,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; } if (pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { - if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb + if (mndCheckDbConflict(pNew->stbname, pTrans)) conflict = true; // for stb } } @@ -825,9 +827,8 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { pNew->id, pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, pTrans->conflict); } else { - mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id, - pNew->dbname, pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, - pTrans->conflict); + mInfo("trans:%d, db:%s stb:%s type:%d, not conflict with trans:%d db:%s stb:%s type:%d", pNew->id, pNew->dbname, + pNew->stbname, pNew->conflict, pTrans->id, pTrans->dbname, pTrans->stbname, pTrans->conflict); } sdbRelease(pMnode->pSdb, pTrans); } @@ -930,7 +931,7 @@ static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans) { SRpcHandleInfo *pInfo = taosArrayGet(pTrans->pRpcArray, i); if (pInfo->handle != NULL) { mInfo("trans:%d, send rsp, code:0x%x stage:%s app:%p", pTrans->id, code, mndTransStr(pTrans->stage), - pInfo->ahandle); + pInfo->ahandle); if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { code = TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL; } @@ -1013,8 +1014,8 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) { pAction->errCode = pRsp->code; } - mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, - mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode); + mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage), + action, pRsp->code, pAction->acceptableCode, pAction->retryCode); mndTransExecute(pMnode, pTrans); _OVER: @@ -1030,7 +1031,7 @@ static void mndTransResetAction(SMnode *pMnode, STrans *pTrans, STransAction *pA pAction->errCode == TSDB_CODE_SYN_INTERNAL_ERROR || pAction->errCode == TSDB_CODE_SYN_NOT_LEADER) { pAction->epSet.inUse = (pAction->epSet.inUse + 1) % pAction->epSet.numOfEps; mInfo("trans:%d, %s:%d execute status is reset and set epset inuse:%d", pTrans->id, mndTransStr(pAction->stage), - pAction->id, pAction->epSet.inUse); + pAction->id, pAction->epSet.inUse); } else { mInfo("trans:%d, %s:%d execute status is reset", pTrans->id, mndTransStr(pAction->stage), pAction->id); } @@ -1060,7 +1061,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi pAction->errCode = 0; code = 0; mInfo("trans:%d, %s:%d write to sdb, type:%s status:%s", pTrans->id, mndTransStr(pAction->stage), pAction->id, - sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); + sdbTableName(pAction->pRaw->type), sdbStatusName(pAction->pRaw->status)); mndSetTransLastAction(pTrans, pAction); } else { @@ -1261,7 +1262,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) pTrans->code = 0; pTrans->redoActionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), - pAction->id); + pAction->id); code = mndTransSync(pMnode, pTrans); if (code != 0) { pTrans->code = terrno; @@ -1280,7 +1281,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) terrno = code; pTrans->code = code; mInfo("trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d", pTrans->id, - mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes); + mndTransStr(pAction->stage), pAction->id, code, pTrans->failedTimes); break; } }