diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 7e7afc9774..9620c4ea62 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -144,9 +144,10 @@ typedef struct SSdbRow SSdbRow; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; typedef enum { SDB_STATUS_CREATING = 1, - SDB_STATUS_READY = 2, + SDB_STATUS_UPDATING = 2, SDB_STATUS_DROPPING = 3, - SDB_STATUS_DROPPED = 4 + SDB_STATUS_READY = 4, + SDB_STATUS_DROPPED = 5 } ESdbStatus; typedef enum { @@ -174,67 +175,19 @@ typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef struct { - /** - * @brief The sdb type of the table. - * - */ - ESdbType sdbType; - - /** - * @brief The key type of the table. - * - */ - EKeyType keyType; - - /** - * @brief The callback function when the table is first deployed. - * - */ + ESdbType sdbType; + EKeyType keyType; SdbDeployFp deployFp; - - /** - * @brief Encode one row of the table into rawdata. - * - */ SdbEncodeFp encodeFp; - - /** - * @brief Decode one row of the table from rawdata. - * - */ SdbDecodeFp decodeFp; - - /** - * @brief The callback function when insert a row to sdb. - * - */ SdbInsertFp insertFp; - - /** - * @brief The callback function when undate a row in sdb. - * - */ SdbUpdateFp updateFp; - - /** - * @brief The callback function when delete a row from sdb. - * - */ SdbDeleteFp deleteFp; } SSdbTable; typedef struct SSdbOpt { - /** - * @brief The path of the sdb file. - * - */ const char *path; - - /** - * @brief The mnode object. - * - */ - SMnode *pMnode; + SMnode *pMnode; } SSdbOpt; /** diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index aace3f4ee3..5c672c74be 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -103,7 +103,6 @@ typedef struct STrans { int32_t id; ETrnStage stage; ETrnPolicy policy; - SMnode *pMnode; void *rpcHandle; SArray *redoLogs; SArray *undoLogs; @@ -306,6 +305,7 @@ typedef struct SMnodeMsg { typedef struct { int32_t id; + int32_t code; void *rpcHandle; } STransMsg; diff --git a/source/dnode/mnode/impl/inc/mndSync.h b/source/dnode/mnode/impl/inc/mndSync.h index 02ba725be1..fe557cdeac 100644 --- a/source/dnode/mnode/impl/inc/mndSync.h +++ b/source/dnode/mnode/impl/inc/mndSync.h @@ -25,7 +25,7 @@ extern "C" { int32_t mndInitSync(SMnode *pMnode); void mndCleanupSync(SMnode *pMnode); bool mndIsMaster(SMnode *pMnode); -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg); +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 878337e4be..5da1d1ca2b 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -32,10 +32,10 @@ int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); - -int32_t mndTransPrepare(STrans *pTrans); +int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); -int32_t mndTransExecute(SSdb *pSdb, int32_t tranId); +char *mndTransStageStr(ETrnStage stage); +char *mndTransPolicyStr(ETrnPolicy policy); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index fd02c6e251..b638728647 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -357,7 +357,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -491,7 +491,7 @@ static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbO } sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -571,7 +571,7 @@ static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index cf46d044ca..fbcf623d3c 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -431,7 +431,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -503,7 +503,7 @@ static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index e407b271fd..3fd7dcfba1 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -183,7 +183,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -226,7 +226,7 @@ static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) { } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 6278e3ffef..e91c51d301 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -238,7 +238,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg * } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -313,7 +313,7 @@ static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeOb } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index c3afbf37c8..63bf186be5 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -285,7 +285,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -433,7 +433,7 @@ static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 6e7ee662f8..59161b32f2 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -21,16 +21,16 @@ int32_t mndInitSync(SMnode *pMnode) { return 0; } void mndCleanupSync(SMnode *pMnode) {} -int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg) { +int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { int32_t code = 0; - int32_t len = sdbGetRawTotalSize(pRaw); - SSdbRaw *pReceived = calloc(1, len); - memcpy(pReceived, pRaw, len); - mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); + // int32_t len = sdbGetRawTotalSize(pRaw); + // SSdbRaw *pReceived = calloc(1, len); + // memcpy(pReceived, pRaw, len); + // mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); - mndTransApply(pMnode, pReceived, pMsg, code); - return 0; + // mndTransApply(pMnode, pReceived, code); + return code; } bool mndIsMaster(SMnode *pMnode) { return true; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3a53472d45..65957e44bc 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -17,8 +17,9 @@ #include "mndTrans.h" #include "mndSync.h" -#define SDB_TRANS_VER 1 -#define TRN_DEFAULT_ARRAY_SIZE 8 +#define TSDB_TRANS_VER 1 +#define TSDB_TRN_ARRAY_SIZE 8 +#define TSDB_TRN_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); @@ -26,6 +27,22 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); +static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); +static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); +static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw); +static void mndTransDropArray(SArray *pArray); +static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray); +static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); +static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); +static void mndTransExecute(SMnode *pMnode, STrans *pTrans); + int32_t mndInitTrans(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_TRANS, .keyType = SDB_KEY_INT32, @@ -63,7 +80,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { rawDataLen += sdbGetRawTotalSize(pTmp); } - SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, SDB_TRANS_VER, rawDataLen); + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen); if (pRaw == NULL) { mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); return NULL; @@ -100,6 +117,8 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } + SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) + SDB_SET_DATALEN(pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); return pRaw; } @@ -113,7 +132,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { return NULL; } - if (sver != SDB_TRANS_VER) { + if (sver != TSDB_TRANS_VER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr()); return NULL; @@ -126,11 +145,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { return NULL; } - pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); + pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -197,6 +216,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } } + SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE) + TRANS_DECODE_OVER: if (code != 0) { mError("trans:%d, failed to parse from raw:%p since %s", pTrans->id, pRaw, tstrerror(errno)); @@ -210,64 +231,71 @@ TRANS_DECODE_OVER: } static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform insert action, stage:%d", pTrans->id, pTrans->stage); - - SArray *pArray = pTrans->redoLogs; - int32_t arraySize = taosArrayGetSize(pArray); - - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWrite(pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); - return code; - } - } + mTrace("trans:%d, perform insert action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); return 0; } static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { - mTrace("trans:%d, perform delete action, stage:%d", pTrans->id, pTrans->stage); + mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); - SArray *pArray = pTrans->undoLogs; - int32_t arraySize = taosArrayGetSize(pArray); - - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWrite(pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write raw:%p to sdb since %s", pTrans->id, pRaw, terrstr()); - return code; - } - } + mndTransDropArray(pTrans->redoLogs); + mndTransDropArray(pTrans->undoLogs); + mndTransDropArray(pTrans->commitLogs); + mndTransDropArray(pTrans->redoActions); + mndTransDropArray(pTrans->undoActions); return 0; } static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOldTrans, STrans *pNewTrans) { - mTrace("trans:%d, perform update action, stage:%d", pOldTrans->id, pNewTrans->stage); - - SArray *pArray = pOldTrans->commitLogs; - int32_t arraySize = taosArrayGetSize(pArray); - - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWrite(pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write raw:%p to sdb since %s", pOldTrans->id, pRaw, terrstr()); - return code; - } - } - + mTrace("trans:%d, perform update action, stage:%s", pOldTrans->id, mndTransStageStr(pNewTrans->stage)); pOldTrans->stage = pNewTrans->stage; return 0; } +STrans *mndAcquireTrans(SMnode *pMnode, int32_t transId) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_TRANS, &transId); +} + +void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pTrans); +} + static int32_t trnGenerateTransId() { static int32_t tmp = 0; return ++tmp; } +char *mndTransStageStr(ETrnStage stage) { + switch (stage) { + case TRN_STAGE_PREPARE: + return "prepare"; + case TRN_STAGE_EXECUTE: + return "execute"; + case TRN_STAGE_COMMIT: + return "commit"; + case TRN_STAGE_ROLLBACK: + return "rollback"; + case TRN_STAGE_RETRY: + return "retry"; + default: + return "undefined"; + } +} + +char *mndTransPolicyStr(ETrnPolicy policy) { + switch (policy) { + case TRN_POLICY_ROLLBACK: + return "prepare"; + case TRN_POLICY_RETRY: + return "retry"; + default: + return "undefined"; + } +} + STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { @@ -279,13 +307,12 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->id = trnGenerateTransId(); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->pMnode = pMnode; pTrans->rpcHandle = rpcHandle; - pTrans->redoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TRN_DEFAULT_ARRAY_SIZE, sizeof(void *)); + pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -298,7 +325,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return pTrans; } -static void trnDropArray(SArray *pArray) { +static void mndTransDropArray(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); @@ -308,17 +335,17 @@ static void trnDropArray(SArray *pArray) { } void mndTransDrop(STrans *pTrans) { - trnDropArray(pTrans->redoLogs); - trnDropArray(pTrans->undoLogs); - trnDropArray(pTrans->commitLogs); - trnDropArray(pTrans->redoActions); - trnDropArray(pTrans->undoActions); + mndTransDropArray(pTrans->redoLogs); + mndTransDropArray(pTrans->undoLogs); + mndTransDropArray(pTrans->commitLogs); + mndTransDropArray(pTrans->redoActions); + mndTransDropArray(pTrans->undoActions); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); } -void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { +static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { pTrans->rpcHandle = rpcHandle; mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); } @@ -340,19 +367,19 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw); - mTrace("trans:%d, raw:%p append to redo logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw); - mTrace("trans:%d, raw:%p append to undo logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw); - mTrace("trans:%d, raw:%p append to commit logs, code:%d", pTrans->id, pRaw, code); + mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code); return code; } @@ -368,7 +395,7 @@ int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { return code; } -int32_t mndTransPrepare(STrans *pTrans) { +int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, prepare transaction", pTrans->id); SSdbRaw *pRaw = mndTransActionEncode(pTrans); @@ -376,180 +403,276 @@ int32_t mndTransPrepare(STrans *pTrans) { mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); return -1; } - sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); + sdbSetRawStatus(pRaw, SDB_STATUS_READY); - if (sdbWriteNotFree(pTrans->pMnode->pSdb, pRaw) != 0) { - mError("trans:%d, failed to write trans since %s", pTrans->id, terrstr()); - return -1; - } - - STransMsg *pMsg = calloc(1, sizeof(STransMsg)); - pMsg->id = pTrans->id; - pMsg->rpcHandle = pTrans->rpcHandle; - - mDebug("trans:%d, start sync, RPC:%p pMsg:%p", pTrans->id, pTrans->rpcHandle, pMsg); - if (mndSyncPropose(pTrans->pMnode, pRaw, pMsg) != 0) { + mTrace("trans:%d, start sync", pTrans->id); + int32_t code = mndSyncPropose(pMnode, pRaw); + if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); - free(pMsg); sdbFreeRaw(pRaw); return -1; } - sdbFreeRaw(pRaw); + mTrace("trans:%d, sync finished", pTrans->id); + + code = sdbWrite(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + return -1; + } + + STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id); + if (pNewTrans == NULL) { + mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr()); + return -1; + } + + mDebug("trans:%d, prepare finished", pNewTrans->id); + mndTransExecute(pMnode, pNewTrans); + mndReleaseTrans(pMnode, pNewTrans); return 0; } -static void trnSendRpcRsp(STransMsg *pMsg, int32_t code) { - mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x pMsg:%p", pMsg->id, pMsg->rpcHandle, code & 0xFFFF, pMsg); - if (pMsg->rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = pMsg->rpcHandle, .code = code}; - rpcSendResponse(&rspMsg); +int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { + mDebug("trans:%d, commit transaction", pTrans->id); + + SSdbRaw *pRaw = mndTransActionEncode(pTrans); + if (pRaw == NULL) { + mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + return -1; + } + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + + mTrace("trans:%d, start sync", pTrans->id); + int32_t code = mndSyncPropose(pMnode, pRaw); + if (code != 0) { + mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); + sdbFreeRaw(pRaw); + return -1; } - free(pMsg); + mTrace("trans:%d, sync finished", pTrans->id); + code = sdbWrite(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + return -1; + } + + mDebug("trans:%d, commit finished", pTrans->id); + return 0; +} + +int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { + mDebug("trans:%d, rollback transaction", pTrans->id); + + SSdbRaw *pRaw = mndTransActionEncode(pTrans); + if (pRaw == NULL) { + mError("trans:%d, failed to decode trans since %s", pTrans->id, terrstr()); + return -1; + } + sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); + + mTrace("trans:%d, start sync", pTrans->id); + int32_t code = mndSyncPropose(pMnode, pRaw); + if (code != 0) { + mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); + sdbFreeRaw(pRaw); + return -1; + } + + mTrace("trans:%d, sync finished", pTrans->id); + code = sdbWrite(pMnode->pSdb, pRaw); + if (code != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + return -1; + } + + mDebug("trans:%d, rollback finished", pTrans->id); + return 0; +} + +static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) { + if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; + mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF); + + if (pTrans->rpcHandle != NULL) { + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code}; + rpcSendResponse(&rspMsg); + } } void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) { - if (code == 0) { - mDebug("trans:%d, commit transaction", pMsg->id); - sdbSetRawStatus(pRaw, SDB_STATUS_READY); - if (sdbWrite(pMnode->pSdb, pRaw) != 0) { - code = terrno; - mError("trans:%d, failed to write sdb while commit since %s", pMsg->id, terrstr()); - } - trnSendRpcRsp(pMsg, code); - } else { - mDebug("trans:%d, rollback transaction", pMsg->id); - sdbSetRawStatus(pRaw, SDB_STATUS_DROPPED); - if (sdbWrite(pMnode->pSdb, pRaw) != 0) { - mError("trans:%d, failed to write sdb while rollback since %s", pMsg->id, terrstr()); - } - trnSendRpcRsp(pMsg, code); - } + // todo } -static int32_t trnExecuteArray(SMnode *pMnode, SArray *pArray) { - for (int32_t i = 0; i < pArray->size; ++i) { +static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { + SSdb *pSdb = pMnode->pSdb; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t i = 0; i < arraySize; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); - if (sdbWrite(pMnode->pSdb, pRaw) != 0) { - return -1; + int32_t code = sdbWriteNotFree(pSdb, pRaw); + if (code != 0) { + return code; } } return 0; } -static int32_t trnExecuteRedoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoLogs); } - -static int32_t trnExecuteUndoLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoLogs); } - -static int32_t trnExecuteCommitLogs(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->commitLogs); } - -static int32_t trnExecuteRedoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->redoActions); } - -static int32_t trnExecuteUndoActions(STrans *pTrans) { return trnExecuteArray(pTrans->pMnode, pTrans->undoActions); } - -static int32_t trnPerformPrepareStage(STrans *pTrans) { - if (trnExecuteRedoLogs(pTrans) == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - return 0; +static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteArray(pMnode, pTrans->redoLogs); + if (code != 0) { + mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - return -1; + mTrace("trans:%d, execute redo logs finished", pTrans->id) } + + return code; } -static int32_t trnPerformExecuteStage(STrans *pTrans) { - int32_t code = trnExecuteRedoActions(pTrans); +static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteArray(pMnode, pTrans->undoLogs); + if (code != 0) { + mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) + } else { + mTrace("trans:%d, execute undo logs finished", pTrans->id) + } + + return code; +} + +static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteArray(pMnode, pTrans->commitLogs); + if (code != 0) { + mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) + } else { + mTrace("trans:%d, execute commit logs finished", pTrans->id) + } + + return code; +} + +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { + mTrace("trans:%d, execute redo actions finished", pTrans->id); + return 0; +} + +static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { + mTrace("trans:%d, execute undo actions finished", pTrans->id); + return 0; +} + +static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteRedoLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_EXECUTE; + mTrace("trans:%d, stage from prepare to execute", pTrans->id); + } else { + pTrans->stage = TRN_STAGE_ROLLBACK; + mError("trans:%d, stage from prepare to rollback since %s", pTrans->id, terrstr()); + } + + return 0; +} + +static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); if (code == 0) { pTrans->stage = TRN_STAGE_COMMIT; - return 0; + mTrace("trans:%d, stage from execute to commit", pTrans->id); } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { - return -1; + mTrace("trans:%d, stage keep on execute since %s", pTrans->id, terrstr(code)); + return code; } else { - if (pTrans->policy == TRN_POLICY_RETRY) { - pTrans->stage = TRN_STAGE_RETRY; - } else { + if (pTrans->policy == TRN_POLICY_ROLLBACK) { pTrans->stage = TRN_STAGE_ROLLBACK; + mError("trans:%d, stage from execute to rollback since %s", pTrans->id, terrstr()); + } else { + pTrans->stage = TRN_STAGE_RETRY; + mError("trans:%d, stage from execute to retry since %s", pTrans->id, terrstr()); } - return 0; } + + return 0; } -static int32_t trnPerformCommitStage(STrans *pTrans) { - if (trnExecuteCommitLogs(pTrans) == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - return 0; +static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteCommitLogs(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_COMMIT; + mTrace("trans:%d, commit stage finished", pTrans->id); + } else { + if (pTrans->policy == TRN_POLICY_ROLLBACK) { + pTrans->stage = TRN_STAGE_ROLLBACK; + mError("trans:%d, stage from commit to rollback since %s", pTrans->id, terrstr()); + } else { + pTrans->stage = TRN_STAGE_RETRY; + mError("trans:%d, stage from commit to retry since %s", pTrans->id, terrstr()); + } + } + + return code; +} + +static int32_t mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); + + if (code == 0) { + mTrace("trans:%d, rollbacked", pTrans->id); } else { pTrans->stage = TRN_STAGE_ROLLBACK; - return -1; + mError("trans:%d, stage keep on rollback since %s", pTrans->id, terrstr()); } + + return code; } -static int32_t trnPerformRollbackStage(STrans *pTrans) { - if (trnExecuteCommitLogs(pTrans) == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - return 0; +static int32_t mndTransPerformRetryStage(SMnode *pMnode, STrans *pTrans) { + int32_t code = mndTransExecuteRedoActions(pMnode, pTrans); + + if (code == 0) { + pTrans->stage = TRN_STAGE_COMMIT; + mTrace("trans:%d, stage from retry to commit", pTrans->id); } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - return -1; + pTrans->stage = TRN_STAGE_RETRY; + mError("trans:%d, stage keep on retry since %s", pTrans->id, terrstr()); } + + return code; } -static int32_t trnPerformRetryStage(STrans *pTrans) { - if (trnExecuteCommitLogs(pTrans) == 0) { - pTrans->stage = TRN_STAGE_EXECUTE; - return 0; - } else { - pTrans->stage = TRN_STAGE_ROLLBACK; - return -1; - } -} - -int32_t mndTransExecute(SSdb *pSdb, int32_t tranId) { +static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; - STrans *pTrans = sdbAcquire(pSdb, SDB_TRANS, &tranId); - if (pTrans == NULL) { - return -1; - } - - if (pTrans->stage == TRN_STAGE_PREPARE) { - if (trnPerformPrepareStage(pTrans) != 0) { - sdbRelease(pSdb, pTrans); - return -1; + while (code == 0) { + switch (pTrans->stage) { + case TRN_STAGE_PREPARE: + code = mndTransPerformPrepareStage(pMnode, pTrans); + break; + case TRN_STAGE_EXECUTE: + code = mndTransPerformExecuteStage(pMnode, pTrans); + break; + case TRN_STAGE_COMMIT: + code = mndTransCommit(pMnode, pTrans); + if (code == 0) { + code = mndTransPerformCommitStage(pMnode, pTrans); + } + break; + case TRN_STAGE_ROLLBACK: + code = mndTransPerformRollbackStage(pMnode, pTrans); + if (code == 0) { + code = mndTransRollback(pMnode, pTrans); + } + break; + case TRN_STAGE_RETRY: + code = mndTransPerformRetryStage(pMnode, pTrans); + break; } } - if (pTrans->stage == TRN_STAGE_EXECUTE) { - if (trnPerformExecuteStage(pTrans) != 0) { - sdbRelease(pSdb, pTrans); - return -1; - } - } - - if (pTrans->stage == TRN_STAGE_COMMIT) { - if (trnPerformCommitStage(pTrans) != 0) { - sdbRelease(pSdb, pTrans); - return -1; - } - } - - if (pTrans->stage == TRN_STAGE_ROLLBACK) { - if (trnPerformRollbackStage(pTrans) != 0) { - sdbRelease(pSdb, pTrans); - return -1; - } - } - - if (pTrans->stage == TRN_STAGE_RETRY) { - if (trnPerformRetryStage(pTrans) != 0) { - sdbRelease(pSdb, pTrans); - return -1; - } - } - - sdbRelease(pSdb, pTrans); - return 0; -} \ No newline at end of file + mndTransSendRpcRsp(pTrans, code); +} diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 2f582a810d..e6bf30f41b 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -235,7 +235,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, } sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -269,7 +269,7 @@ static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewU } sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1; @@ -311,7 +311,7 @@ static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) { } sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); - if (mndTransPrepare(pTrans) != 0) { + if (mndTransPrepare(pMnode, pTrans) != 0) { mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return -1;