diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 5d12780a3b..98798a6235 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -750,31 +750,36 @@ typedef struct { } SReplica; typedef struct { - char db[TSDB_FULL_DB_NAME_LEN]; int32_t vgId; + int32_t dnodeId; + char db[TSDB_FULL_DB_NAME_LEN]; + uint64_t dbUid; int32_t cacheBlockSize; int32_t totalBlocks; int32_t daysPerFile; int32_t daysToKeep0; int32_t daysToKeep1; int32_t daysToKeep2; - int32_t minRowsPerFileBlock; - int32_t maxRowsPerFileBlock; + int32_t minRows; + int32_t maxRows; + int32_t commitTime; int32_t fsyncPeriod; - int8_t reserved[16]; + int8_t walLevel; int8_t precision; int8_t compression; - int8_t cacheLastRow; - int8_t update; - int8_t walLevel; int8_t quorum; + int8_t update; + int8_t cacheLastRow; int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; } SCreateVnodeMsg, SAlterVnodeMsg; typedef struct { - int32_t vgId; + int32_t vgId; + int32_t dnodeId; + char db[TSDB_FULL_DB_NAME_LEN]; + uint64_t dbUid; } SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; typedef struct { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 74263667ea..ad4b383d03 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -120,17 +120,18 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_INVALID_INPUT TAOS_DEF_ERROR_CODE(0, 0X0224) //"Invalid tsc input") // mnode-common -#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0300) -#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0301) -#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0302) -#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0303) -#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0304) -#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0305) -#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0306) -#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0307) -#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0308) -#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x0309) -#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030A) +#define TSDB_CODE_MND_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x0300) +#define TSDB_CODE_MND_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0301) +#define TSDB_CODE_MND_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0302) +#define TSDB_CODE_MND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0303) +#define TSDB_CODE_MND_ACTION_NEED_REPROCESSED TAOS_DEF_ERROR_CODE(0, 0x0304) +#define TSDB_CODE_MND_NO_RIGHTS TAOS_DEF_ERROR_CODE(0, 0x0305) +#define TSDB_CODE_MND_INVALID_OPTIONS TAOS_DEF_ERROR_CODE(0, 0x0306) +#define TSDB_CODE_MND_INVALID_CONNECTION TAOS_DEF_ERROR_CODE(0, 0x0307) +#define TSDB_CODE_MND_INVALID_MSG_VERSION TAOS_DEF_ERROR_CODE(0, 0x0308) +#define TSDB_CODE_MND_INVALID_MSG_LEN TAOS_DEF_ERROR_CODE(0, 0x0309) +#define TSDB_CODE_MND_INVALID_MSG_TYPE TAOS_DEF_ERROR_CODE(0, 0x030A) +#define TSDB_CODE_MND_TOO_MANY_SHELL_CONNS TAOS_DEF_ERROR_CODE(0, 0x030B) // mnode-show #define TSDB_CODE_MND_INVALID_SHOWOBJ TAOS_DEF_ERROR_CODE(0, 0x0310) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 5da1d1ca2b..92e86efa9e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *, void *pMsg); -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *, void *pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); char *mndTransStageStr(ETrnStage stage); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 696f798c9a..e9cdedd332 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -28,7 +28,9 @@ SVgObj *mndAcquireVgroup(SMnode *pMnode, int32_t vgId); void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SSdbRaw *mndVgroupActionEncode(SVgObj *pVgroup); -SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); + +SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index fbbae13b63..6decc82743 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -285,10 +285,62 @@ static int32_t mndSetCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg } static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + SVgObj *pVgroup = pVgroups + v; + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) { + return -1; + } + + SEpSet epset = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) { + return -1; + } + + SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)}; + if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) { + rpcFreeCont(pMsg); + return -1; + } + } + } + return 0; } static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { + for (int v = 0; v < pDb->cfg.numOfVgroups; ++v) { + SVgObj *pVgroup = pVgroups + v; + + for (int32_t vn = 0; vn < pVgroup->replica; ++vn) { + SVnodeGid *pVgid = pVgroup->vnodeGid + vn; + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pDnode == NULL) { + return -1; + } + + SEpSet epset = mndGetDnodeEpset(pDnode); + mndReleaseDnode(pMnode, pDnode); + + SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + if (pMsg == NULL) { + return -1; + } + + SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)}; + if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) { + rpcFreeCont(pMsg); + return -1; + } + } + } + return 0; } @@ -644,7 +696,7 @@ static int32_t mndProcessUseDbMsg(SMnodeMsg *pMsg) { return -1; } - int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); + int32_t contLen = sizeof(SUseDbRsp) + pDb->cfg.numOfVgroups * sizeof(SVgroupInfo); SUseDbRsp *pRsp = rpcMallocCont(contLen); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 55e8b3a721..493f20bc9a 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -180,8 +180,12 @@ static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pOldDnode, SDnodeObj } SDnodeObj *mndAcquireDnode(SMnode *pMnode, int32_t dnodeId) { - SSdb *pSdb = pMnode->pSdb; - return sdbAcquire(pSdb, SDB_DNODE, &dnodeId); + SSdb *pSdb = pMnode->pSdb; + SDnodeObj *pDnode = sdbAcquire(pSdb, SDB_DNODE, &dnodeId); + if (pDnode == NULL) { + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + } + return pDnode; } void mndReleaseDnode(SMnode *pMnode, SDnodeObj *pDnode) { diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 32ac795301..8399b79921 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -21,6 +21,11 @@ #define TSDB_TRN_ARRAY_SIZE 8 #define TSDB_TRN_RESERVE_SIZE 64 +typedef struct { + SEpSet epSet; + SRpcMsg msg; +} STransAction; + static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); @@ -29,8 +34,10 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); -static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw); -static void mndTransDropArray(SArray *pArray); +static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg); +static void mndTransDropLog(SArray *pArray); +static void mndTransDropAction(SArray *pArray); static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); @@ -58,7 +65,7 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} static SSdbRaw *mndTransActionEncode(STrans *pTrans) { - int32_t rawDataLen = 16 * sizeof(int32_t) + TSDB_TRN_RESERVE_SIZE; + int32_t rawDataLen = sizeof(STrans) + TSDB_TRN_RESERVE_SIZE; int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -80,6 +87,16 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { rawDataLen += sdbGetRawTotalSize(pTmp); } + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + } + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen); if (pRaw == NULL) { mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); @@ -116,6 +133,22 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, (void *)pTmp, len) } + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->redoActions, i); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); + SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction *pAction = taosArrayGet(pTrans->undoActions, i); + SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); + SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + } + SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) SDB_SET_DATALEN(pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); @@ -147,8 +180,8 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -175,42 +208,77 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->redoLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; } } for (int32_t i = 0; i < undoLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->undoLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; } } for (int32_t i = 0; i < commitLogNum; ++i) { int32_t dataLen = 0; SDB_GET_INT32(pRaw, pRow, dataPos, &dataLen) - char *pData = malloc(dataLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pData, dataLen); + void *ret = taosArrayPush(pTrans->commitLogs, &pData); if (ret == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; - break; + } + } + + for (int32_t i = 0; i < redoActionNum; ++i) { + STransAction action = {0}; + SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) + action.msg.pCont = rpcMallocCont(action.msg.contLen); + if (action.msg.pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + + void *ret = taosArrayPush(pTrans->redoActions, &action); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + } + + for (int32_t i = 0; i < undoActionNum; ++i) { + STransAction action = {0}; + SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) + action.msg.pCont = rpcMallocCont(action.msg.contLen); + if (action.msg.pCont == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; + } + SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + + void *ret = taosArrayPush(pTrans->undoActions, &action); + if (ret == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto TRANS_DECODE_OVER; } } @@ -237,11 +305,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); - mndTransDropArray(pTrans->redoLogs); - mndTransDropArray(pTrans->undoLogs); - mndTransDropArray(pTrans->commitLogs); - mndTransDropArray(pTrans->redoActions); - mndTransDropArray(pTrans->undoActions); + mndTransDropLog(pTrans->redoLogs); + mndTransDropLog(pTrans->undoLogs); + mndTransDropLog(pTrans->commitLogs); + mndTransDropAction(pTrans->redoActions); + mndTransDropAction(pTrans->undoActions); return 0; } @@ -274,6 +342,8 @@ char *mndTransStageStr(ETrnStage stage) { return "rollback"; case TRN_STAGE_RETRY: return "retry"; + case TRN_STAGE_OVER: + return "stop"; default: return "undefined"; } @@ -305,8 +375,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -319,7 +389,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return pTrans; } -static void mndTransDropArray(SArray *pArray) { +static void mndTransDropLog(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); @@ -328,12 +398,21 @@ static void mndTransDropArray(SArray *pArray) { taosArrayDestroy(pArray); } +static void mndTransDropAction(SArray *pArray) { + for (int32_t i = 0; i < pArray->size; ++i) { + STransAction *pAction = taosArrayGet(pArray, i); + rpcFreeCont(pAction->msg.pCont); + } + + taosArrayDestroy(pArray); +} + void mndTransDrop(STrans *pTrans) { - mndTransDropArray(pTrans->redoLogs); - mndTransDropArray(pTrans->undoLogs); - mndTransDropArray(pTrans->commitLogs); - mndTransDropArray(pTrans->redoActions); - mndTransDropArray(pTrans->undoActions); + mndTransDropLog(pTrans->redoLogs); + mndTransDropLog(pTrans->undoLogs); + mndTransDropLog(pTrans->commitLogs); + mndTransDropAction(pTrans->redoActions); + mndTransDropAction(pTrans->undoActions); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); @@ -344,7 +423,7 @@ static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); } -static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { +static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -360,31 +439,43 @@ static int32_t mndTransAppendArray(SArray *pArray, SSdbRaw *pRaw) { } int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->redoLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->redoLogs, pRaw); mTrace("trans:%d, raw:%p append to redo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->undoLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->undoLogs, pRaw); mTrace("trans:%d, raw:%p append to undo logs, code:0x%x", pTrans->id, pRaw, code); return code; } int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { - int32_t code = mndTransAppendArray(pTrans->commitLogs, pRaw); + int32_t code = mndTransAppendLog(pTrans->commitLogs, pRaw); mTrace("trans:%d, raw:%p append to commit logs, code:0x%x", pTrans->id, pRaw, code); return code; } -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - int32_t code = mndTransAppendArray(pTrans->redoActions, pMsg); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) { + STransAction action = {.epSet = *pEpSet, .msg = *pMsg}; + + void *ptr = taosArrayPush(pArray, &action); + if (ptr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg); mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg); return code; } -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, void *pMsg) { - int32_t code = mndTransAppendArray(pTrans->undoActions, pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { + int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg); mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg); return code; } @@ -559,18 +650,37 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { return code; } -static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - if (taosArrayGetSize(pTrans->redoActions) != 0) { - mTrace("trans:%d, execute redo actions finished", pTrans->id); +static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { + SSdb *pSdb = pMnode->pSdb; + int32_t arraySize = taosArrayGetSize(pArray); + + for (int32_t i = 0; i < arraySize; ++i) { + SSdbRaw *pRaw = taosArrayGetP(pArray, i); + int32_t code = sdbWriteNotFree(pSdb, pRaw); + if (code != 0) { + return code; + } } + return 0; } +static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; + if (taosArrayGetSize(pTrans->redoActions) != 0) { + mTrace("trans:%d, execute redo actions finished", pTrans->id); + } + + return code; +} + static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { + int32_t code = 0; if (taosArrayGetSize(pTrans->undoActions) != 0) { mTrace("trans:%d, execute undo actions finished", pTrans->id); } - return 0; + + return code; } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index c2be7fa39a..da37bb06b0 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -24,9 +24,10 @@ #define TSDB_VGROUP_VER_NUM 1 #define TSDB_VGROUP_RESERVE_SIZE 64 -static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); -static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); +static SSdbRow *mndVgroupActionDecode(SSdbRaw *pRaw); +static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); +static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOldVgroup, SVgObj *pNewVgroup); static int32_t mndProcessCreateVnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessAlterVnodeRsp(SMnodeMsg *pMsg); @@ -156,6 +157,80 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } +SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg)); + if (pCreate == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pCreate->dnodeId = htonl(pDnode->id); + pCreate->vgId = htonl(pVgroup->vgId); + memcpy(pCreate->db, pDb->name, TSDB_FULL_DB_NAME_LEN); + pCreate->dbUid = htobe64(pDb->uid); + pCreate->cacheBlockSize = htonl(pDb->cfg.cacheBlockSize); + pCreate->totalBlocks = htonl(pDb->cfg.totalBlocks); + pCreate->daysPerFile = htonl(pDb->cfg.daysPerFile); + pCreate->daysToKeep0 = htonl(pDb->cfg.daysToKeep0); + pCreate->daysToKeep1 = htonl(pDb->cfg.daysToKeep1); + pCreate->daysToKeep2 = htonl(pDb->cfg.daysToKeep2); + pCreate->minRows = htonl(pDb->cfg.minRows); + pCreate->maxRows = htonl(pDb->cfg.maxRows); + pCreate->commitTime = htonl(pDb->cfg.commitTime); + pCreate->fsyncPeriod = htonl(pDb->cfg.fsyncPeriod); + pCreate->walLevel = pDb->cfg.walLevel; + pCreate->precision = pDb->cfg.precision; + pCreate->compression = pDb->cfg.compression; + pCreate->quorum = pDb->cfg.quorum; + pCreate->update = pDb->cfg.update; + pCreate->cacheLastRow = pDb->cfg.cacheLastRow; + pCreate->replica = pVgroup->replica; + pCreate->selfIndex = -1; + + for (int32_t v = 0; v < pVgroup->replica; ++v) { + SReplica *pReplica = &pCreate->replicas[v]; + SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; + SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); + if (pVgidDnode == NULL) { + rpcFreeCont(pCreate); + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + pReplica->id = htonl(pVgidDnode->id); + pReplica->port = htons(pVgidDnode->port); + memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN); + mndReleaseDnode(pMnode, pVgidDnode); + + if (pDnode->id == pVgid->dnodeId) { + pCreate->selfIndex = v; + } + } + + if (pCreate->selfIndex == -1) { + rpcFreeCont(pCreate); + terrno = TSDB_CODE_MND_APP_ERROR; + return NULL; + } + + return pCreate; +} + +SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg)); + if (pDrop == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + pDrop->dnodeId = htonl(pDnode->id); + pDrop->vgId = htonl(pVgroup->vgId); + memcpy(pDrop->db, pDb->name, TSDB_FULL_DB_NAME_LEN); + pDrop->dbUid = htobe64(pDb->uid); + + return pDrop; +} + static int32_t mndGetAvailableDnode(SMnode *pMnode, SVgObj *pVgroup) { SSdb *pSdb = pMnode->pSdb; int32_t allocedVnodes = 0; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 0450513fc5..3e374b344b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -130,6 +130,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_VALUE_OUT_OF_RANGE, "Value out of range") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_INPUT, "Invalid tsc input") // mnode-common +TAOS_DEFINE_ERROR(TSDB_CODE_MND_APP_ERROR, "Mnode internal error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_NOT_READY, "Cluster not ready") TAOS_DEFINE_ERROR(TSDB_CODE_MND_MSG_NOT_PROCESSED, "Message not processed") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACTION_IN_PROGRESS, "Message is progressing")