From 856104c4f3714affcd67286fafca5e2b0bdc625c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 25 Dec 2021 13:27:37 +0800 Subject: [PATCH 1/3] minor changes --- source/dnode/mnode/impl/inc/mndTopic.h | 2 +- source/dnode/mnode/impl/src/mndFunc.c | 2 +- source/dnode/mnode/impl/src/mndStb.c | 4 +- source/dnode/mnode/impl/src/mndTopic.c | 56 ++++++------ source/dnode/mnode/impl/src/mndTrans.c | 115 +++++++++---------------- 5 files changed, 73 insertions(+), 106 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndTopic.h b/source/dnode/mnode/impl/inc/mndTopic.h index 62d213b8a2..d092f47d4b 100644 --- a/source/dnode/mnode/impl/inc/mndTopic.h +++ b/source/dnode/mnode/impl/inc/mndTopic.h @@ -26,7 +26,7 @@ int32_t mndInitTopic(SMnode *pMnode); void mndCleanupTopic(SMnode *pMnode); STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName); -void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); +void mndReleaseTopic(SMnode *pMnode, STopicObj *pTopic); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index 7e9d4ebeab..c6209c949f 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -441,7 +441,7 @@ static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int16_t le } if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_BINARY) { - int32_t bytes = len > 0 ? (int)(len - VARSTR_HEADER_SIZE) : len; + int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len; snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes); buf[buflen - 1] = 0; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 8b62c2f02d..411f68e7e1 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -834,8 +834,8 @@ static int32_t mndGetStbMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pM } static void mndExtractTableName(char *tableId, char *name) { - int pos = -1; - int num = 0; + int32_t pos = -1; + int32_t num = 0; for (pos = 0; tableId[pos] != 0; ++pos) { if (tableId[pos] == '.') num++; if (num == 2) break; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 5b45aea2e6..648ebb14fc 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -14,18 +14,18 @@ */ #define _DEFAULT_SOURCE -#include "mndStb.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" #include "mndShow.h" +#include "mndStb.h" #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" #include "tname.h" -#define TSDB_TOPIC_VER_NUMBER 1 -#define TSDB_TOPIC_RESERVE_SIZE 64 +#define MND_TOPIC_VER_NUMBER 1 +#define MND_TOPIC_RESERVE_SIZE 64 static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic); static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw); @@ -73,8 +73,8 @@ int32_t mndInitTopic(SMnode *pMnode) { void mndCleanupTopic(SMnode *pMnode) {} static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { - int32_t size = sizeof(STopicObj) + TSDB_TOPIC_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, TSDB_TOPIC_VER_NUMBER, size); + int32_t size = sizeof(STopicObj) + MND_TOPIC_RESERVE_SIZE; + SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -90,7 +90,7 @@ static SSdbRaw *mndTopicActionEncode(STopicObj *pTopic) { SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen); SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen); - SDB_SET_RESERVE(pRaw, dataPos, TSDB_TOPIC_RESERVE_SIZE); + SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE); SDB_SET_DATALEN(pRaw, dataPos); return pRaw; @@ -100,14 +100,14 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { int8_t sver = 0; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_TOPIC_VER_NUMBER) { - mError("failed to decode stable since %s", terrstr()); + if (sver != MND_TOPIC_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode topic since %s", terrstr()); return NULL; } - int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); - SSdbRow *pRow = sdbAllocRow(size); + int32_t size = sizeof(STopicObj) + TSDB_MAX_COLUMNS * sizeof(SSchema); + SSdbRow *pRow = sdbAllocRow(size); STopicObj *pTopic = sdbGetRowObj(pRow); if (pTopic == NULL) return NULL; @@ -124,7 +124,7 @@ static SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_INT32(pRaw, pRow, dataPos, &pTopic->sqlLen); SDB_GET_BINARY(pRaw, pRow, dataPos, pTopic->sql, pTopic->sqlLen); - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TOPIC_RESERVE_SIZE); + SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TOPIC_RESERVE_SIZE); return pRow; } @@ -168,7 +168,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, STopicObj *pOldTopic, STopicObj } STopicObj *mndAcquireTopic(SMnode *pMnode, char *topicName) { - SSdb *pSdb = pMnode->pSdb; + SSdb *pSdb = pMnode->pSdb; STopicObj *pTopic = sdbAcquire(pSdb, SDB_TOPIC, topicName); if (pTopic == NULL) { terrno = TSDB_CODE_MND_TOPIC_NOT_EXIST; @@ -208,7 +208,7 @@ static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *p pCreate->sverson = htonl(pTopic->version); pCreate->sql = malloc(pTopic->sqlLen); - if(pCreate->sql == NULL) { + if (pCreate->sql == NULL) { free(pCreate); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -216,7 +216,7 @@ static SCreateTopicInternalMsg *mndBuildCreateTopicMsg(SMnode *pMnode, SVgObj *p memcpy(pCreate->sql, pTopic->sql, pTopic->sqlLen); pCreate->executor = malloc(pTopic->execLen); - if(pCreate->executor == NULL) { + if (pCreate->executor == NULL) { free(pCreate); terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -244,7 +244,7 @@ static SDropTopicInternalMsg *mndBuildDropTopicMsg(SMnode *pMnode, SVgObj *pVgro } static int32_t mndCheckCreateTopicMsg(SCreateTopicMsg *pCreate) { - //deserialize and other stuff + // deserialize and other stuff return 0; } @@ -413,7 +413,7 @@ CREATE_TOPIC_OVER: } static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SCreateTopicMsg *pCreate = pMsg->rpcMsg.pCont; mDebug("topic:%s, start to create", pCreate->name); @@ -436,7 +436,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) { } } - //topic should have different name with stb + // topic should have different name with stb SStbObj *pStb = mndAcquireStb(pMnode, pCreate->name); if (pStb != NULL) { sdbRelease(pMnode->pSdb, pStb); @@ -492,7 +492,7 @@ static int32_t mndCheckAlterTopicMsg(SAlterTopicMsg *pAlter) { static int32_t mndUpdateTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pOldTopic, STopicObj *pNewTopic) { return 0; } static int32_t mndProcessAlterTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SAlterTopicMsg *pAlter = pMsg->rpcMsg.pCont; mDebug("topic:%s, start to alter", pAlter->name); @@ -606,7 +606,7 @@ DROP_TOPIC_OVER: } static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; + SMnode *pMnode = pMsg->pMnode; SDropTopicMsg *pDrop = pMsg->rpcMsg.pCont; mDebug("topic:%s, start to drop", pDrop->name); @@ -705,7 +705,7 @@ static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessCreateTopicInRsp(SMnodeMsg* pMsg) { +static int32_t mndProcessCreateTopicInRsp(SMnodeMsg *pMsg) { mndTransHandleActionRsp(pMsg); return 0; } @@ -788,8 +788,8 @@ static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg * } static void mndExtractTableName(char *tableId, char *name) { - int pos = -1; - int num = 0; + int32_t pos = -1; + int32_t num = 0; for (pos = 0; tableId[pos] != 0; ++pos) { if (tableId[pos] == '.') num++; if (num == 2) break; @@ -801,13 +801,13 @@ static void mndExtractTableName(char *tableId, char *name) { } static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { - SMnode *pMnode = pMsg->pMnode; - SSdb *pSdb = pMnode->pSdb; - int32_t numOfRows = 0; + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; STopicObj *pTopic = NULL; - int32_t cols = 0; - char *pWrite; - char prefix[64] = {0}; + int32_t cols = 0; + char *pWrite; + char prefix[64] = {0}; tstrncpy(prefix, pShow->db, 64); strcat(prefix, TS_PATH_DELIMITER); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 354d456cc4..966ee3c4b9 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -17,9 +17,9 @@ #include "mndTrans.h" #include "mndSync.h" -#define TSDB_TRANS_VER_NUMBER 1 -#define TSDB_TRN_ARRAY_SIZE 8 -#define TSDB_TRN_RESERVE_SIZE 64 +#define MND_TRANS_VER_NUMBER 1 +#define MND_TRANS_ARRAY_SIZE 8 +#define MND_TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); @@ -61,7 +61,7 @@ int32_t mndInitTrans(SMnode *pMnode) { void mndCleanupTrans(SMnode *pMnode) {} static SSdbRaw *mndTransActionEncode(STrans *pTrans) { - int32_t rawDataLen = sizeof(STrans) + TSDB_TRN_RESERVE_SIZE; + int32_t rawDataLen = sizeof(STrans) + MND_TRANS_RESERVE_SIZE; int32_t redoLogNum = taosArrayGetSize(pTrans->redoLogs); int32_t undoLogNum = taosArrayGetSize(pTrans->undoLogs); int32_t commitLogNum = taosArrayGetSize(pTrans->commitLogs); @@ -93,7 +93,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { rawDataLen += (sizeof(STransAction) + pAction->contLen); } - SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, TSDB_TRANS_VER_NUMBER, rawDataLen); + SSdbRaw *pRaw = sdbAllocRaw(SDB_TRANS, MND_TRANS_VER_NUMBER, rawDataLen); if (pRaw == NULL) { mError("trans:%d, failed to alloc raw since %s", pTrans->id, terrstr()); return NULL; @@ -145,7 +145,7 @@ static SSdbRaw *mndTransActionEncode(STrans *pTrans) { SDB_SET_BINARY(pRaw, dataPos, (void *)pAction->pCont, pAction->contLen); } - SDB_SET_RESERVE(pRaw, dataPos, TSDB_TRN_RESERVE_SIZE) + SDB_SET_RESERVE(pRaw, dataPos, MND_TRANS_RESERVE_SIZE) SDB_SET_DATALEN(pRaw, dataPos); mTrace("trans:%d, encode to raw:%p, len:%d", pTrans->id, pRaw, dataPos); return pRaw; @@ -155,12 +155,9 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { int32_t code = 0; int8_t sver = 0; - if (sdbGetRawSoftVer(pRaw, &sver) != 0) { - mError("failed to get soft ver from raw:%p since %s", pRaw, terrstr()); - return NULL; - } + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; - if (sver != TSDB_TRANS_VER_NUMBER) { + if (sver != MND_TRANS_VER_NUMBER) { terrno = TSDB_CODE_SDB_INVALID_DATA_VER; mError("failed to get check soft ver from raw:%p since %s", pRaw, terrstr()); return NULL; @@ -173,11 +170,11 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { return NULL; } - pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -278,7 +275,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) { } } - SDB_GET_RESERVE(pRaw, pRow, dataPos, TSDB_TRN_RESERVE_SIZE) + SDB_GET_RESERVE(pRaw, pRow, dataPos, MND_TRANS_RESERVE_SIZE) TRANS_DECODE_OVER: if (code != 0) { @@ -370,11 +367,11 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; pTrans->rpcHandle = rpcHandle; - pTrans->redoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->undoLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->commitLogs = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(void *)); - pTrans->redoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); - pTrans->undoActions = taosArrayInit(TSDB_TRN_ARRAY_SIZE, sizeof(STransAction)); + pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); + pTrans->redoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); + pTrans->undoActions = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(STransAction)); if (pTrans->redoLogs == NULL || pTrans->undoLogs == NULL || pTrans->commitLogs == NULL || pTrans->redoActions == NULL || pTrans->undoActions == NULL) { @@ -504,7 +501,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { STrans *pNewTrans = mndAcquireTrans(pMnode, pTrans->id); if (pNewTrans == NULL) { - mError("trans:%d, failed to ready from sdb since %s", pTrans->id, terrstr()); + mError("trans:%d, failed to read from sdb since %s", pTrans->id, terrstr()); return -1; } @@ -526,19 +523,17 @@ int32_t mndTransCommit(SMnode *pMnode, STrans *pTrans) { if (taosArrayGetSize(pTrans->commitLogs) != 0) { mTrace("trans:%d, sync to other nodes", pTrans->id); - int32_t code = mndSyncPropose(pMnode, pRaw); - if (code != 0) { + if (mndSyncPropose(pMnode, pRaw) != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); sdbFreeRaw(pRaw); return -1; } - mTrace("trans:%d, sync finished", pTrans->id); - code = sdbWrite(pMnode->pSdb, pRaw); - if (code != 0) { - mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); - return -1; - } + } + + if (sdbWrite(pMnode->pSdb, pRaw) != 0) { + mError("trans:%d, failed to write sdb since %s", pTrans->id, terrstr()); + return -1; } mDebug("trans:%d, commit finished", pTrans->id); @@ -614,7 +609,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { } int32_t actionNum = taosArrayGetSize(pTrans->redoActions); - if (action < 0 || action > actionNum) { + if (action < 0 || action >= actionNum) { mError("trans:%d, invalid action:%d", transId, action); goto HANDLE_ACTION_RSP_OVER; } @@ -636,6 +631,8 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { SSdb *pSdb = pMnode->pSdb; int32_t arraySize = taosArrayGetSize(pArray); + if (arraySize == 0) return 0; + for (int32_t i = 0; i < arraySize; ++i) { SSdbRaw *pRaw = taosArrayGetP(pArray, i); int32_t code = sdbWriteNotFree(pSdb, pRaw); @@ -648,45 +645,15 @@ static int32_t mndTransExecuteLogs(SMnode *pMnode, SArray *pArray) { } static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->redoLogs) != 0) { - code = mndTransExecuteLogs(pMnode, pTrans->redoLogs); - if (code != 0) { - mError("trans:%d, failed to execute redo logs since %s", pTrans->id, terrstr()) - } else { - mDebug("trans:%d, execute redo logs finished", pTrans->id) - } - } - - return code; + return mndTransExecuteLogs(pMnode, pTrans->redoLogs); } static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->undoLogs) != 0) { - code = mndTransExecuteLogs(pMnode, pTrans->undoLogs); - if (code != 0) { - mError("trans:%d, failed to execute undo logs since %s", pTrans->id, terrstr()) - } else { - mDebug("trans:%d, execute undo logs finished", pTrans->id) - } - } - - return code; + return mndTransExecuteLogs(pMnode, pTrans->undoLogs); } static int32_t mndTransExecuteCommitLogs(SMnode *pMnode, STrans *pTrans) { - int32_t code = 0; - if (taosArrayGetSize(pTrans->commitLogs) != 0) { - code = mndTransExecuteLogs(pMnode, pTrans->commitLogs); - if (code != 0) { - mError("trans:%d, failed to execute commit logs since %s", pTrans->id, terrstr()) - } else { - mDebug("trans:%d, execute commit logs finished", pTrans->id) - } - } - - return code; + return mndTransExecuteLogs(pMnode, pTrans->commitLogs); } static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { @@ -719,25 +686,25 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA mndSendMsgToDnode(pMnode, &pAction->epSet, &rpcMsg); } - int32_t numOfReceivedMsgs = 0; - int32_t errorCode = 0; + int32_t numOfReceived = 0; + int32_t errCode = 0; for (int32_t action = 0; action < numOfActions; ++action) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction == NULL) continue; if (pAction->msgSent && pAction->msgReceived) { - numOfReceivedMsgs++; + numOfReceived++; if (pAction->errCode != 0) { - errorCode = pAction->errCode; + errCode = pAction->errCode; } } } - if (numOfReceivedMsgs == numOfActions) { - mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errorCode); - terrno = errorCode; - return errorCode; + if (numOfReceived == numOfActions) { + mDebug("trans:%d, all %d actions executed, code:0x%x", pTrans->id, numOfActions, errCode); + terrno = errCode; + return errCode; } else { - mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceivedMsgs, numOfActions, errorCode); + mDebug("trans:%d, %d of %d actions executed, code:0x%x", pTrans->id, numOfReceived, numOfActions, errCode); return TSDB_CODE_MND_ACTION_IN_PROGRESS; } } From 1f70617b371141758ae0bcd776104d4f359613da Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 25 Dec 2021 14:09:23 +0800 Subject: [PATCH 2/3] add ahandle to topic --- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndTrans.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 6 +++--- source/dnode/mnode/impl/src/mndDnode.c | 4 ++-- source/dnode/mnode/impl/src/mndFunc.c | 4 ++-- source/dnode/mnode/impl/src/mndMnode.c | 4 ++-- source/dnode/mnode/impl/src/mndStb.c | 4 ++-- source/dnode/mnode/impl/src/mndTopic.c | 4 ++-- source/dnode/mnode/impl/src/mndTrans.c | 22 +++++++++------------- source/dnode/mnode/impl/src/mndUser.c | 6 +++--- 10 files changed, 27 insertions(+), 30 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 927b1e2e16..b21dc0841c 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -96,6 +96,7 @@ typedef struct { ETrnStage stage; ETrnPolicy policy; void *rpcHandle; + void *rpcAHandle; SArray *redoLogs; SArray *undoLogs; SArray *commitLogs; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index a424c315f9..2d57179f1c 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -35,7 +35,7 @@ typedef struct { int32_t mndInitTrans(SMnode *pMnode); void mndCleanupTrans(SMnode *pMnode); -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle); +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg); void mndTransDrop(STrans *pTrans); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 82aea79edb..6eb94eccf2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -400,7 +400,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDbMsg *pCreat } int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); goto CREATE_DB_OVER; @@ -608,7 +608,7 @@ static int32_t mndSetUpdateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj static int32_t mndUpdateDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pOldDb, SDbObj *pNewDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to update since %s", pOldDb->name, terrstr()); return terrno; @@ -772,7 +772,7 @@ static int32_t mndSetDropDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *p static int32_t mndDropDb(SMnode *pMnode, SMnodeMsg *pMsg, SDbObj *pDb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("db:%s, failed to drop since %s", pDb->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index f13bbed247..80c9f9544e 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -396,7 +396,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * return terrno; } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("dnode:%s, failed to create since %s", pCreate->ep, terrstr()); return -1; @@ -452,7 +452,7 @@ static int32_t mndProcessCreateDnodeMsg(SMnodeMsg *pMsg) { } static int32_t mndDropDnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("dnode:%d, failed to drop since %s", pDnode->id, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndFunc.c b/source/dnode/mnode/impl/src/mndFunc.c index c6209c949f..638d984c69 100644 --- a/source/dnode/mnode/impl/src/mndFunc.c +++ b/source/dnode/mnode/impl/src/mndFunc.c @@ -147,7 +147,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC pFunc->pCode = pFunc->pData + pCreate->commentSize; memcpy(pFunc->pCode, pCreate->pCont + pCreate->commentSize, pFunc->codeSize); - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { free(pFunc); mError("func:%s, failed to create since %s", pCreate->name, terrstr()); @@ -195,7 +195,7 @@ static int32_t mndCreateFunc(SMnode *pMnode, SMnodeMsg *pMsg, SCreateFuncMsg *pC } static int32_t mndDropFunc(SMnode *pMnode, SMnodeMsg *pMsg, SFuncObj *pFunc) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("func:%s, failed to drop since %s", pFunc->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 9f76060063..8f57a18a1d 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -334,7 +334,7 @@ static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SDnodeObj *pDnode mnodeObj.updateTime = mnodeObj.createdTime; int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); goto CREATE_MNODE_OVER; @@ -500,7 +500,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pObj) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { mError("mnode:%d, failed to drop since %s", pObj->id, terrstr()); goto DROP_MNODE_OVER; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 411f68e7e1..0f577c3c3b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -415,7 +415,7 @@ static int32_t mndCreateStb(SMnode *pMnode, SMnodeMsg *pMsg, SCreateStbMsg *pCre memcpy(stbObj.pSchema, pCreate->pSchema, totalSize); int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("stb:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -614,7 +614,7 @@ static int32_t mndSetDropStbUndoActions(SMnode *pMnode, STrans *pTrans, SStbObj static int32_t mndDropStb(SMnode *pMnode, SMnodeMsg *pMsg, SStbObj *pStb) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("stb:%s, failed to drop since %s", pStb->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 648ebb14fc..04f6907918 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -367,7 +367,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCreateTopicMsg * #endif int32_t code = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to create since %s", pCreate->name, terrstr()); return -1; @@ -561,7 +561,7 @@ static int32_t mndSetDropTopicUndoActions(SMnode *pMnode, STrans *pTrans, STopic static int32_t mndDropTopic(SMnode *pMnode, SMnodeMsg *pMsg, STopicObj *pTopic) { int32_t code = -1; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("topic:%s, failed to drop since %s", pTopic->name, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 966ee3c4b9..2432e31b9e 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -27,7 +27,6 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans); static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *OldTrans, STrans *pOldTrans); static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans); -static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle); static void mndTransSendRpcRsp(STrans *pTrans, int32_t code); static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); @@ -355,7 +354,7 @@ char *mndTransPolicyStr(ETrnPolicy policy) { } } -STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { +STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, SRpcMsg *pMsg) { STrans *pTrans = calloc(1, sizeof(STrans)); if (pTrans == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -366,7 +365,8 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) { pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS); pTrans->stage = TRN_STAGE_PREPARE; pTrans->policy = policy; - pTrans->rpcHandle = rpcHandle; + pTrans->rpcHandle = pMsg->handle; + pTrans->rpcAHandle = pMsg->ahandle; pTrans->redoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->undoLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); pTrans->commitLogs = taosArrayInit(MND_TRANS_ARRAY_SIZE, sizeof(void *)); @@ -413,11 +413,6 @@ void mndTransDrop(STrans *pTrans) { tfree(pTrans); } -static void mndTransSetRpcHandle(STrans *pTrans, void *rpcHandle) { - pTrans->rpcHandle = rpcHandle; - mTrace("trans:%d, set rpc handle:%p", pTrans->id, rpcHandle); -} - static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw) { if (pArray == NULL || pRaw == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -506,6 +501,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { } pNewTrans->rpcHandle = pTrans->rpcHandle; + pNewTrans->rpcAHandle = pTrans->rpcAHandle; mndTransExecute(pMnode, pNewTrans); mndReleaseTrans(pMnode, pNewTrans); return 0; @@ -571,10 +567,11 @@ int32_t mndTransRollback(SMnode *pMnode, STrans *pTrans) { static void mndTransSendRpcRsp(STrans *pTrans, int32_t code) { if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) return; - mDebug("trans:%d, send rpc rsp, RPC:%p code:0x%x", pTrans->id, pTrans->rpcHandle, code & 0xFFFF); + mDebug("trans:%d, send rpc rsp, RPC:%p ahandle:%p code:0x%x", pTrans->id, pTrans->rpcHandle, pTrans->rpcAHandle, + code & 0xFFFF); if (pTrans->rpcHandle != NULL) { - SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code}; + SRpcMsg rspMsg = {.handle = pTrans->rpcHandle, .code = code, .ahandle = pTrans->rpcAHandle}; rpcSendResponse(&rspMsg); } } @@ -739,7 +736,6 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { mDebug("trans:%d, stage from execute to commit", pTrans->id); } else if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on execute since %s", pTrans->id, tstrerror(code)); - return code; } else { if (pTrans->policy == TRN_POLICY_ROLLBACK) { pTrans->stage = TRN_STAGE_ROLLBACK; @@ -790,9 +786,9 @@ static void mndTransExecute(SMnode *pMnode, STrans *pTrans) { } break; case TRN_STAGE_ROLLBACK: - code = mndTransPerformRollbackStage(pMnode, pTrans); + code = mndTransRollback(pMnode, pTrans); if (code == 0) { - code = mndTransRollback(pMnode, pTrans); + mndTransPerformRollbackStage(pMnode, pTrans); } break; default: diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 9b5eca6d39..0926cf0c48 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -197,7 +197,7 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass, userObj.updateTime = userObj.createdTime; userObj.superUser = 0; - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to create since %s", user, terrstr()); return -1; @@ -267,7 +267,7 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) { } static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewUser, SMnodeMsg *pMsg) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to update since %s", pOldUser->user, terrstr()); return -1; @@ -342,7 +342,7 @@ static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) { } static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) { - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { mError("user:%s, failed to drop since %s", pUser->user, terrstr()); return -1; From b10cff8b3521f90d8dffd41339b851c1898069ad Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 25 Dec 2021 14:40:43 +0800 Subject: [PATCH 3/3] framework integrated with sync --- include/dnode/mnode/mnode.h | 10 ----- source/dnode/mgmt/impl/inc/dndInt.h | 1 - source/dnode/mgmt/impl/src/dndDnode.c | 2 +- source/dnode/mgmt/impl/src/dndMnode.c | 56 -------------------------- source/dnode/mnode/impl/inc/mndInt.h | 9 ++++- source/dnode/mnode/impl/src/mndSync.c | 56 +++++++++++++++++++------- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 6 +-- 8 files changed, 53 insertions(+), 89 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index f4a6f7a4c9..09d1f8c013 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -27,7 +27,6 @@ typedef struct SMnodeMsg SMnodeMsg; typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToMnodeQFp)(SDnode *pDnode, SMnodeMsg *pMsg); typedef struct SMnodeLoad { int64_t numOfDnode; @@ -63,7 +62,6 @@ typedef struct { SReplica replicas[TSDB_MAX_REPLICA]; SMnodeCfg cfg; SDnode *pDnode; - PutMsgToMnodeQFp putMsgToApplyMsgFp; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; @@ -172,14 +170,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg); */ void mndProcessSyncMsg(SMnodeMsg *pMsg); -/** - * @brief Process the apply request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessApplyMsg(SMnodeMsg *pMsg); - #ifdef __cplusplus } #endif diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 8973574d23..6f1357e9c1 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -80,7 +80,6 @@ typedef struct { SRWLatch latch; taos_queue pReadQ; taos_queue pWriteQ; - taos_queue pApplyQ; taos_queue pSyncQ; taos_queue pMgmtQ; SWorkerPool mgmtPool; diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 6b5aeb078a..af86e59518 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -369,7 +369,7 @@ void dndSendStatusMsg(SDnode *pDnode) { dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); contLen = sizeof(SStatusMsg) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); - SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = 9527}; + SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; dTrace("pDnode:%p, send status msg to mnode", pDnode); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index c62c05fe2f..8fbb473af1 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -28,18 +28,15 @@ static void dndCleanupMnodeSyncWorker(SDnode *pDnode); static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); -static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode); -static void dndFreeMnodeApplyQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode); static void dndFreeMnodeMgmtQueue(SDnode *pDnode); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); @@ -47,7 +44,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEp void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg); static int32_t dndStartMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode); @@ -271,11 +267,6 @@ static int32_t dndStartMnodeWorker(SDnode *pDnode) { return -1; } - if (dndAllocMnodeApplyQueue(pDnode) != 0) { - dError("failed to alloc mnode apply queue since %s", terrstr()); - return -1; - } - if (dndAllocMnodeSyncQueue(pDnode) != 0) { dError("failed to alloc mnode sync queue since %s", terrstr()); return -1; @@ -293,7 +284,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { while (pMgmt->refCount > 1) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pApplyQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); @@ -303,7 +293,6 @@ static void dndStopMnodeWorker(SDnode *pDnode) { dndFreeMnodeReadQueue(pDnode); dndFreeMnodeWriteQueue(pDnode); - dndFreeMnodeApplyQueue(pDnode); dndFreeMnodeSyncQueue(pDnode); } @@ -328,7 +317,6 @@ static void dndInitMnodeOption(SDnode *pDnode, SMnodeOpt *pOption) { pOption->sendMsgToDnodeFp = dndSendMsgToDnode; pOption->sendMsgToMnodeFp = dndSendMsgToMnode; pOption->sendRedirectMsgFp = dndSendRedirectMsg; - pOption->putMsgToApplyMsgFp = dndPutMsgIntoMnodeApplyQueue; pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); pOption->cfg.sver = pDnode->opt.sver; @@ -604,20 +592,6 @@ static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { mndCleanupMsg(pMsg); } -static void dndProcessMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { - mndProcessApplyMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); - } - - mndCleanupMsg(pMsg); -} - static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -712,19 +686,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndReleaseMnode(pDnode, pMnode); } -static int32_t dndPutMsgIntoMnodeApplyQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL) { - return -1; - } - - int32_t code = taosWriteQitem(pMgmt->pApplyQ, pMsg); - dndReleaseMnode(pDnode, pMnode); - return code; -} - static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); @@ -817,23 +778,6 @@ static void dndFreeMnodeWriteQueue(SDnode *pDnode) { pMgmt->pWriteQ = NULL; } -static int32_t dndAllocMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pApplyQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeApplyQueue); - if (pMgmt->pApplyQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeApplyQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pApplyQ); - pMgmt->pApplyQ = NULL; -} - static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SWorkerPool *pPool = &pMgmt->writePool; diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index ba8746c009..a9932ce048 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -61,6 +61,13 @@ typedef struct { char email[TSDB_FQDN_LEN]; } STelemMgmt; +typedef struct { + int32_t errCode; + sem_t syncSem; + SSyncNode *pSyncNode; + ESyncState state; +} SSyncMgmt; + typedef struct SMnode { int32_t dnodeId; int32_t clusterId; @@ -77,11 +84,11 @@ typedef struct SMnode { SShowMgmt showMgmt; SProfileMgmt profileMgmt; STelemMgmt telemMgmt; + SSyncMgmt syncMgmt; MndMsgFp msgFp[TDMT_MAX]; SendMsgToDnodeFp sendMsgToDnodeFp; SendMsgToMnodeFp sendMsgToMnodeFp; SendRedirectMsgFp sendRedirectMsgFp; - PutMsgToMnodeQFp putMsgToApplyMsgFp; } SMnode; void mndSendMsgToDnode(SMnode *pMnode, SEpSet *pEpSet, SRpcMsg *rpcMsg); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 5e9165f898..6a2fca836f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -14,26 +14,54 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" -#include "mndTrans.h" +#include "mndSync.h" -int32_t mndInitSync(SMnode *pMnode) { return 0; } -void mndCleanupSync(SMnode *pMnode) {} +int32_t mndInitSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_init(&pMgmt->syncSem, 0, 0); + + pMgmt->state = TAOS_SYNC_STATE_LEADER; + pMgmt->pSyncNode = NULL; + return 0; +} + +void mndCleanupSync(SMnode *pMnode) { + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + tsem_destroy(&pMgmt->syncSem); +} + +static int32_t mndSyncApplyCb(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) { + SMnode *pMnode = pData; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + + pMgmt->errCode = 0; + tsem_post(&pMgmt->syncSem); + + return 0; +} int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) { - int32_t code = 0; +#if 1 + return 0; +#else + if (pMnode->replica == 1) return 0; - // int32_t len = sdbGetRawTotalSize(pRaw); - // SSdbRaw *pReceived = calloc(1, len); - // memcpy(pReceived, pRaw, len); - // mDebug("trans:%d, data:%p recv from sync, code:0x%x pMsg:%p", pMsg->id, pReceived, code & 0xFFFF, pMsg); + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + pMgmt->errCode = 0; - // mndTransApply(pMnode, pReceived, code); - return code; + SSyncBuffer buf = {.data = pRaw, .len = sdbGetRawTotalSize(pRaw)}; + + bool isWeak = false; + int32_t code = syncPropose(pMgmt->pSyncNode, &buf, pMnode, isWeak); + + if (code != 0) return code; + + tsem_wait(&pMgmt->syncSem); + return pMgmt->errCode; +#endif } bool mndIsMaster(SMnode *pMnode) { - // pMnode->role = TAOS_SYNC_STATE_LEADER; - return true; + SSyncMgmt *pMgmt = &pMnode->syncMgmt; + return pMgmt->state == TAOS_SYNC_STATE_LEADER; } \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 2432e31b9e..f63d14e711 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -746,7 +746,7 @@ static int32_t mndTransPerformExecuteStage(SMnode *pMnode, STrans *pTrans) { } } - return 0; + return code; } static int32_t mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 26b1c71a10..27668a585a 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -202,7 +202,6 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->selfIndex = pOption->selfIndex; memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); pMnode->pDnode = pOption->pDnode; - pMnode->putMsgToApplyMsgFp = pOption->putMsgToApplyMsgFp; pMnode->sendMsgToDnodeFp = pOption->sendMsgToDnodeFp; pMnode->sendMsgToMnodeFp = pOption->sendMsgToMnodeFp; pMnode->sendRedirectMsgFp = pOption->sendRedirectMsgFp; @@ -217,8 +216,7 @@ static int32_t mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) { pMnode->cfg.buildinfo = strdup(pOption->cfg.buildinfo); if (pMnode->sendMsgToDnodeFp == NULL || pMnode->sendMsgToMnodeFp == NULL || pMnode->sendRedirectMsgFp == NULL || - pMnode->putMsgToApplyMsgFp == NULL || pMnode->dnodeId < 0 || pMnode->clusterId < 0 || - pMnode->cfg.statusInterval < 1) { + pMnode->dnodeId < 0 || pMnode->clusterId < 0 || pMnode->cfg.statusInterval < 1) { terrno = TSDB_CODE_MND_INVALID_OPTIONS; return -1; } @@ -438,8 +436,6 @@ void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } -void mndProcessApplyMsg(SMnodeMsg *pMsg) {} - uint64_t mndGenerateUid(char *name, int32_t len) { int64_t us = taosGetTimestampUs(); int32_t hashval = MurmurHash3_32(name, len);