diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 92e86efa9e..5c15e2f987 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -30,8 +30,8 @@ void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans); void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code); char *mndTransStageStr(ETrnStage stage); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6decc82743..e85edf66da 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -303,9 +303,8 @@ static int32_t mndSetRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return -1; } - SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_ALTER_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SCreateVnodeMsg)}; - if (mndTransAppendRedoAction(pTrans, &epset, &rpcMsg) != 0) { - rpcFreeCont(pMsg); + if (mndTransAppendRedoAction(pTrans, &epset, TSDB_MSG_TYPE_ALTER_VNODE_IN, sizeof(SCreateVnodeMsg), pMsg) != 0) { + free(pMsg); return -1; } } @@ -333,9 +332,8 @@ static int32_t mndSetUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SV return -1; } - SRpcMsg rpcMsg = {.msgType = TSDB_MSG_TYPE_DROP_VNODE_IN, .pCont = pMsg, .contLen = sizeof(SDropVnodeMsg)}; - if (mndTransAppendUndoAction(pTrans, &epset, &rpcMsg) != 0) { - rpcFreeCont(pMsg); + if (mndTransAppendUndoAction(pTrans, &epset, TSDB_MSG_TYPE_DROP_VNODE_IN, sizeof(SDropVnodeMsg), pMsg) != 0) { + free(pMsg); return -1; } } diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 8399b79921..5caec6c78d 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -23,7 +23,9 @@ typedef struct { SEpSet epSet; - SRpcMsg msg; + int8_t msgType; + int32_t contLen; + void *pCont; } STransAction; static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -35,10 +37,11 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg); -static void mndTransDropLog(SArray *pArray); -static void mndTransDropAction(SArray *pArray); -static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray); +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont); +static void mndTransDropLogs(SArray *pArray); +static void mndTransDropActions(SArray *pArray); +static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray); +static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans); @@ -89,12 +92,12 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + rawDataLen += (sizeof(STransAction) + pAction->contLen); } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); - rawDataLen += (sizeof(STransAction) + pAction->msg.contLen); + rawDataLen += (sizeof(STransAction) + pAction->contLen); } SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER, rawDataLen); @@ -136,17 +139,17 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->redoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen) + SDB_SET_BINARY(pRaw, dataPos, pAction->pCont, pAction->contLen); } for (int32_t i = 0; i < undoActionNum; ++i) { STransAction *pAction = taosArrayGet(pTrans->undoActions, i); SDB_SET_BINARY(pRaw, dataPos, (void *)&pAction->epSet, sizeof(SEpSet)); - SDB_SET_INT8(pRaw, dataPos, pAction->msg.msgType) - SDB_SET_INT32(pRaw, dataPos, pAction->msg.contLen) - SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->msg.pCont, pAction->msg.contLen); + SDB_SET_INT8(pRaw, dataPos, pAction->msgType) + SDB_SET_INT32(pRaw, dataPos, pAction->contLen) + SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen); } SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) @@ -247,14 +250,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < redoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) - action.msg.pCont = rpcMallocCont(action.msg.contLen); - if (action.msg.pCont == NULL) { + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + action.pCont = malloc(action.contLen); + if (action.pCont == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); void *ret = taosArrayPush(pTrans->redoActions, &action); if (ret == NULL) { @@ -266,14 +269,14 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { for (int32_t i = 0; i < undoActionNum; ++i) { STransAction action = {0}; SDB_GET_BINARY(pRaw, pRow, dataPos, (void *)&action.epSet, sizeof(SEpSet)); - SDB_GET_INT8(pRaw, pRow, dataPos, &action.msg.msgType) - SDB_GET_INT32(pRaw, pRow, dataPos, &action.msg.contLen) - action.msg.pCont = rpcMallocCont(action.msg.contLen); - if (action.msg.pCont == NULL) { + SDB_GET_INT8(pRaw, pRow, dataPos, &action.msgType) + SDB_GET_INT32(pRaw, pRow, dataPos, &action.contLen) + action.pCont = malloc(action.contLen); + if (action.pCont == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto TRANS_DECODE_OVER; } - SDB_GET_BINARY(pRaw, pRow, dataPos, action.msg.pCont, action.msg.contLen); + SDB_GET_BINARY(pRaw, pRow, dataPos, action.pCont, action.contLen); void *ret = taosArrayPush(pTrans->undoActions, &action); if (ret == NULL) { @@ -305,11 +308,11 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans) { mTrace("trans:%d, perform delete action, stage:%s", pTrans->id, mndTransStageStr(pTrans->stage)); - mndTransDropLog(pTrans->redoLogs); - mndTransDropLog(pTrans->undoLogs); - mndTransDropLog(pTrans->commitLogs); - mndTransDropAction(pTrans->redoActions); - mndTransDropAction(pTrans->undoActions); + mndTransDropLogs(pTrans->redoLogs); + mndTransDropLogs(pTrans->undoLogs); + mndTransDropLogs(pTrans->commitLogs); + mndTransDropActions(pTrans->redoActions); + mndTransDropActions(pTrans->undoActions); return 0; } @@ -389,7 +392,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { return pTrans; } -static void mndTransDropLog(SArray *pArray) { +static void mndTransDropLogs(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); tfree(pRaw); @@ -398,21 +401,21 @@ static void mndTransDropLog(SArray *pArray) { taosArrayDestroy(pArray); } -static void mndTransDropAction(SArray *pArray) { +static void mndTransDropActions(SArray *pArray) { for (int32_t i = 0; i < pArray->size; ++i) { STransAction *pAction = taosArrayGet(pArray, i); - rpcFreeCont(pAction->msg.pCont); + free(pAction->pCont); } taosArrayDestroy(pArray); } void mndTransDrop(STrans *pTrans) { - mndTransDropLog(pTrans->redoLogs); - mndTransDropLog(pTrans->undoLogs); - mndTransDropLog(pTrans->commitLogs); - mndTransDropAction(pTrans->redoActions); - mndTransDropAction(pTrans->undoActions); + mndTransDropLogs(pTrans->redoLogs); + mndTransDropLogs(pTrans->undoLogs); + mndTransDropLogs(pTrans->commitLogs); + mndTransDropActions(pTrans->redoActions); + mndTransDropActions(pTrans->undoActions); mDebug("trans:%d, data:%p is dropped", pTrans->id, pTrans); tfree(pTrans); @@ -456,8 +459,8 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return code; } -static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMsg) { - STransAction action = {.epSet = *pEpSet, .msg = *pMsg}; +static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + STransAction action = {.epSet = *pEpSet, .msgType = msgType, .contLen = contLen, .pCont = pCont}; void *ptr = taosArrayPush(pArray, &action); if (ptr == NULL) { @@ -468,15 +471,15 @@ static int32_t mndTransAppendAction(SArray *pArray, SEpSet *pEpSet, SRpcMsg *pMs return 0; } -int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, pMsg); - mTrace("trans:%d, msg:%p append to redo actions", pTrans->id, pMsg); +int32_t mndTransAppendRedoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + int32_t code = mndTransAppendAction(pTrans->redoActions, pEpSet, msgType, contLen, pCont); + mTrace("trans:%d, msg:%s len:%d append to redo actions", pTrans->id, taosMsg[msgType], contLen); return code; } -int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, SRpcMsg *pMsg) { - int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, pMsg); - mTrace("trans:%d, msg:%p append to undo actions", pTrans->id, pMsg); +int32_t mndTransAppendUndoAction(STrans *pTrans, SEpSet *pEpSet, int8_t msgType, int32_t contLen, void *pCont) { + int32_t code = mndTransAppendAction(pTrans->undoActions, pEpSet, msgType, contLen, pCont); + mTrace("trans:%d, msg:%s len:%d append to undo actions", pTrans->id, taosMsg[msgType], contLen); return code; } @@ -593,7 +596,7 @@ void mndTransApply(SMnode *pMnode, SSdbRaw *pRaw, STransMsg *pMsg, int32_t code) // todo } -static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { +static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); @@ -611,7 +614,7 @@ static int32_t mndTransExecuteArray(SMnode *pMnode, SArray *pArray) { static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->redoLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->redoLogs); + code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); if (code != 0) { mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) } else { @@ -625,7 +628,7 @@ static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->undoLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->undoLogs); + code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); if (code != 0) { mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) } else { @@ -639,7 +642,7 @@ static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { int32_t code = 0; if (taosArrayGetSize(pTrans->commitLogs) != 0) { - code = mndTransExecuteArray(pMnode, pTrans->commitLogs); + code = mndTransExecuteLogs(pMnode, pTrans->commitLogs); if (code != 0) { mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) } else { @@ -651,36 +654,39 @@ static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { } static int32_t mndTransExecuteActions(SMnode *pMnode, SArray *pArray) { - SSdb *pSdb = pMnode->pSdb; +#if 0 int32_t arraySize = taosArrayGetSize(pArray); - for (int32_t i = 0; i < arraySize; ++i) { - SSdbRaw *pRaw = taosArrayGetP(pArray, i); - int32_t code = sdbWriteNotFree(pSdb, pRaw); - if (code != 0) { - return code; + STransAction *pAction = taosArrayGet(pArray, i); + + SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen}; + rpcMsg.pCont = rpcMallocCont(pAction->contLen); + if (rpcMsg.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } + memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen); + mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +#else return 0; +#endif } static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->redoActions) != 0) { - mTrace("trans:%d, execute redo actions finished", pTrans->id); - } + if (taosArrayGetSize(pTrans->redoActions) <= 0) return 0; - return code; + mTrace("trans:%d, start to execute redo actions", pTrans->id); + return mndTransExecuteActions(pMnode, pTrans->redoActions); } static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->undoActions) != 0) { - mTrace("trans:%d, execute undo actions finished", pTrans->id); - } + if (taosArrayGetSize(pTrans->undoActions) <= 0) return 0; - return code; + mTrace("trans:%d, start to execute undo actions", pTrans->id); + return mndTransExecuteActions(pMnode, pTrans->undoActions); } static int32_t mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index da37bb06b0..e99fea200b 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -158,7 +158,7 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { } SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeMsg *pCreate = rpcMallocCont(sizeof(SCreateVnodeMsg)); + SCreateVnodeMsg *pCreate = malloc(sizeof(SCreateVnodeMsg)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -192,7 +192,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb SVnodeGid *pVgid = &pVgroup->vnodeGid[v]; SDnodeObj *pVgidDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pVgidDnode == NULL) { - rpcFreeCont(pCreate); + free(pCreate); terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } @@ -208,7 +208,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb } if (pCreate->selfIndex == -1) { - rpcFreeCont(pCreate); + free(pCreate); terrno = TSDB_CODE_MND_APP_ERROR; return NULL; } @@ -217,7 +217,7 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb } SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SDropVnodeMsg *pDrop = rpcMallocCont(sizeof(SDropVnodeMsg)); + SDropVnodeMsg *pDrop = malloc(sizeof(SDropVnodeMsg)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL;