diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index f49030466e..e8c4faa240 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -62,6 +62,7 @@ typedef struct SSyncCfg { typedef struct SFsmCbMeta { SyncIndex index; + SyncIndex lastConfigIndex; bool isWeak; int32_t code; ESyncState state; @@ -75,6 +76,7 @@ typedef struct SReConfigCbMeta { int32_t code; SyncIndex index; SyncTerm term; + SyncIndex lastConfigIndex; SyncTerm currentTerm; SSyncCfg oldCfg; SSyncCfg newCfg; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e29c55310a..bd9d8d274e 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -441,108 +441,6 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) #define TSDB_CODE_WAL_LOG_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x1005) -// http -#define TSDB_CODE_HTTP_SERVER_OFFLINE TAOS_DEF_ERROR_CODE(0, 0x1100) //"http server is not online" -#define TSDB_CODE_HTTP_UNSUPPORT_URL TAOS_DEF_ERROR_CODE(0, 0x1101) //"url is not support" -#define TSDB_CODE_HTTP_INVALID_URL TAOS_DEF_ERROR_CODE(0, 0x1102) //invalid url format" -#define TSDB_CODE_HTTP_NO_ENOUGH_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1103) //"no enough memory" -#define TSDB_CODE_HTTP_REQUSET_TOO_BIG TAOS_DEF_ERROR_CODE(0, 0x1104) //"request size is too big" -#define TSDB_CODE_HTTP_NO_AUTH_INFO TAOS_DEF_ERROR_CODE(0, 0x1105) //"no auth info input" -#define TSDB_CODE_HTTP_NO_MSG_INPUT TAOS_DEF_ERROR_CODE(0, 0x1106) //"request is empty" -#define TSDB_CODE_HTTP_NO_SQL_INPUT TAOS_DEF_ERROR_CODE(0, 0x1107) //"no sql input" -#define TSDB_CODE_HTTP_NO_EXEC_USEDB TAOS_DEF_ERROR_CODE(0, 0x1108) //"no need to execute use db cmd" -#define TSDB_CODE_HTTP_SESSION_FULL TAOS_DEF_ERROR_CODE(0, 0x1109) //"session list was full" -#define TSDB_CODE_HTTP_GEN_TAOSD_TOKEN_ERR TAOS_DEF_ERROR_CODE(0, 0x110A) //"generate taosd token error" -#define TSDB_CODE_HTTP_INVALID_MULTI_REQUEST TAOS_DEF_ERROR_CODE(0, 0x110B) //"size of multi request is 0" -#define TSDB_CODE_HTTP_CREATE_GZIP_FAILED TAOS_DEF_ERROR_CODE(0, 0x110C) //"failed to create gzip" -#define TSDB_CODE_HTTP_FINISH_GZIP_FAILED TAOS_DEF_ERROR_CODE(0, 0x110D) //"failed to finish gzip" -#define TSDB_CODE_HTTP_LOGIN_FAILED TAOS_DEF_ERROR_CODE(0, 0x110E) //"failed to login" - -#define TSDB_CODE_HTTP_INVALID_VERSION TAOS_DEF_ERROR_CODE(0, 0x1120) //"invalid http version" -#define TSDB_CODE_HTTP_INVALID_CONTENT_LENGTH TAOS_DEF_ERROR_CODE(0, 0x1121) //"invalid content length" -#define TSDB_CODE_HTTP_INVALID_AUTH_TYPE TAOS_DEF_ERROR_CODE(0, 0x1122) //"invalid type of Authorization" -#define TSDB_CODE_HTTP_INVALID_AUTH_FORMAT TAOS_DEF_ERROR_CODE(0, 0x1123) //"invalid format of Authorization" -#define TSDB_CODE_HTTP_INVALID_BASIC_AUTH TAOS_DEF_ERROR_CODE(0, 0x1124) //"invalid basic Authorization" -#define TSDB_CODE_HTTP_INVALID_TAOSD_AUTH TAOS_DEF_ERROR_CODE(0, 0x1125) //"invalid taosd Authorization" -#define TSDB_CODE_HTTP_PARSE_METHOD_FAILED TAOS_DEF_ERROR_CODE(0, 0x1126) //"failed to parse method" -#define TSDB_CODE_HTTP_PARSE_TARGET_FAILED TAOS_DEF_ERROR_CODE(0, 0x1127) //"failed to parse target" -#define TSDB_CODE_HTTP_PARSE_VERSION_FAILED TAOS_DEF_ERROR_CODE(0, 0x1128) //"failed to parse http version" -#define TSDB_CODE_HTTP_PARSE_SP_FAILED TAOS_DEF_ERROR_CODE(0, 0x1129) //"failed to parse sp" -#define TSDB_CODE_HTTP_PARSE_STATUS_FAILED TAOS_DEF_ERROR_CODE(0, 0x112A) //"failed to parse status" -#define TSDB_CODE_HTTP_PARSE_PHRASE_FAILED TAOS_DEF_ERROR_CODE(0, 0x112B) //"failed to parse phrase" -#define TSDB_CODE_HTTP_PARSE_CRLF_FAILED TAOS_DEF_ERROR_CODE(0, 0x112C) //"failed to parse crlf" -#define TSDB_CODE_HTTP_PARSE_HEADER_FAILED TAOS_DEF_ERROR_CODE(0, 0x112D) //"failed to parse header" -#define TSDB_CODE_HTTP_PARSE_HEADER_KEY_FAILED TAOS_DEF_ERROR_CODE(0, 0x112E) //"failed to parse header key" -#define TSDB_CODE_HTTP_PARSE_HEADER_VAL_FAILED TAOS_DEF_ERROR_CODE(0, 0x112F) //"failed to parse header val" -#define TSDB_CODE_HTTP_PARSE_CHUNK_SIZE_FAILED TAOS_DEF_ERROR_CODE(0, 0x1130) //"failed to parse chunk size" -#define TSDB_CODE_HTTP_PARSE_CHUNK_FAILED TAOS_DEF_ERROR_CODE(0, 0x1131) //"failed to parse chunk" -#define TSDB_CODE_HTTP_PARSE_END_FAILED TAOS_DEF_ERROR_CODE(0, 0x1132) //"failed to parse end section" -#define TSDB_CODE_HTTP_PARSE_INVALID_STATE TAOS_DEF_ERROR_CODE(0, 0x1134) //"invalid parse state" -#define TSDB_CODE_HTTP_PARSE_ERROR_STATE TAOS_DEF_ERROR_CODE(0, 0x1135) //"failed to parse error section" - -#define TSDB_CODE_HTTP_GC_QUERY_NULL TAOS_DEF_ERROR_CODE(0, 0x1150) //"query size is 0" -#define TSDB_CODE_HTTP_GC_QUERY_SIZE TAOS_DEF_ERROR_CODE(0, 0x1151) //"query size can not more than 100" -#define TSDB_CODE_HTTP_GC_REQ_PARSE_ERROR TAOS_DEF_ERROR_CODE(0, 0x1152) //"parse grafana json error" - -#define TSDB_CODE_HTTP_TG_DB_NOT_INPUT TAOS_DEF_ERROR_CODE(0, 0x1160) //"database name can not be null" -#define TSDB_CODE_HTTP_TG_DB_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x1161) //"database name too long" -#define TSDB_CODE_HTTP_TG_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x1162) //"invalid telegraf json fromat" -#define TSDB_CODE_HTTP_TG_METRICS_NULL TAOS_DEF_ERROR_CODE(0, 0x1163) //"metrics size is 0" -#define TSDB_CODE_HTTP_TG_METRICS_SIZE TAOS_DEF_ERROR_CODE(0, 0x1164) //"metrics size can not more than 1K" -#define TSDB_CODE_HTTP_TG_METRIC_NULL TAOS_DEF_ERROR_CODE(0, 0x1165) //"metric name not find" -#define TSDB_CODE_HTTP_TG_METRIC_TYPE TAOS_DEF_ERROR_CODE(0, 0x1166) //"metric name type should be string" -#define TSDB_CODE_HTTP_TG_METRIC_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1167) //"metric name length is 0" -#define TSDB_CODE_HTTP_TG_METRIC_NAME_LONG TAOS_DEF_ERROR_CODE(0, 0x1168) //"metric name length too long" -#define TSDB_CODE_HTTP_TG_TIMESTAMP_NULL TAOS_DEF_ERROR_CODE(0, 0x1169) //"timestamp not find" -#define TSDB_CODE_HTTP_TG_TIMESTAMP_TYPE TAOS_DEF_ERROR_CODE(0, 0x116A) //"timestamp type should be integer" -#define TSDB_CODE_HTTP_TG_TIMESTAMP_VAL_NULL TAOS_DEF_ERROR_CODE(0, 0x116B) //"timestamp value smaller than 0" -#define TSDB_CODE_HTTP_TG_TAGS_NULL TAOS_DEF_ERROR_CODE(0, 0x116C) //"tags not find" -#define TSDB_CODE_HTTP_TG_TAGS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x116D) //"tags size is 0" -#define TSDB_CODE_HTTP_TG_TAGS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x116E) //"tags size too long" -#define TSDB_CODE_HTTP_TG_TAG_NULL TAOS_DEF_ERROR_CODE(0, 0x116F) //"tag is null" -#define TSDB_CODE_HTTP_TG_TAG_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1170) //"tag name is null" -#define TSDB_CODE_HTTP_TG_TAG_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x1171) //"tag name length too long" -#define TSDB_CODE_HTTP_TG_TAG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x1172) //"tag value type should be number or string" -#define TSDB_CODE_HTTP_TG_TAG_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x1173) //"tag value is null" -#define TSDB_CODE_HTTP_TG_TABLE_NULL TAOS_DEF_ERROR_CODE(0, 0x1174) //"table is null" -#define TSDB_CODE_HTTP_TG_TABLE_SIZE TAOS_DEF_ERROR_CODE(0, 0x1175) //"table name length too long" -#define TSDB_CODE_HTTP_TG_FIELDS_NULL TAOS_DEF_ERROR_CODE(0, 0x1176) //"fields not find" -#define TSDB_CODE_HTTP_TG_FIELDS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x1177) //"fields size is 0" -#define TSDB_CODE_HTTP_TG_FIELDS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x1178) //"fields size too long" -#define TSDB_CODE_HTTP_TG_FIELD_NULL TAOS_DEF_ERROR_CODE(0, 0x1179) //"field is null" -#define TSDB_CODE_HTTP_TG_FIELD_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x117A) //"field name is null" -#define TSDB_CODE_HTTP_TG_FIELD_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x117B) //"field name length too long" -#define TSDB_CODE_HTTP_TG_FIELD_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x117C) //"field value type should be number or string" -#define TSDB_CODE_HTTP_TG_FIELD_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x117D) //"field value is null" -#define TSDB_CODE_HTTP_TG_HOST_NOT_STRING TAOS_DEF_ERROR_CODE(0, 0x117E) //"host type should be string" -#define TSDB_CODE_HTTP_TG_STABLE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x117F) //"stable not exist" - -#define TSDB_CODE_HTTP_OP_DB_NOT_INPUT TAOS_DEF_ERROR_CODE(0, 0x1190) //"database name can not be null" -#define TSDB_CODE_HTTP_OP_DB_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x1191) //"database name too long" -#define TSDB_CODE_HTTP_OP_INVALID_JSON TAOS_DEF_ERROR_CODE(0, 0x1192) //"invalid opentsdb json fromat" -#define TSDB_CODE_HTTP_OP_METRICS_NULL TAOS_DEF_ERROR_CODE(0, 0x1193) //"metrics size is 0" -#define TSDB_CODE_HTTP_OP_METRICS_SIZE TAOS_DEF_ERROR_CODE(0, 0x1194) //"metrics size can not more than 10K" -#define TSDB_CODE_HTTP_OP_METRIC_NULL TAOS_DEF_ERROR_CODE(0, 0x1195) //"metric name not find" -#define TSDB_CODE_HTTP_OP_METRIC_TYPE TAOS_DEF_ERROR_CODE(0, 0x1196) //"metric name type should be string" -#define TSDB_CODE_HTTP_OP_METRIC_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x1197) //"metric name length is 0" -#define TSDB_CODE_HTTP_OP_METRIC_NAME_LONG TAOS_DEF_ERROR_CODE(0, 0x1198) //"metric name length can not more than 22" -#define TSDB_CODE_HTTP_OP_TIMESTAMP_NULL TAOS_DEF_ERROR_CODE(0, 0x1199) //"timestamp not find" -#define TSDB_CODE_HTTP_OP_TIMESTAMP_TYPE TAOS_DEF_ERROR_CODE(0, 0x119A) //"timestamp type should be integer" -#define TSDB_CODE_HTTP_OP_TIMESTAMP_VAL_NULL TAOS_DEF_ERROR_CODE(0, 0x119B) //"timestamp value smaller than 0" -#define TSDB_CODE_HTTP_OP_TAGS_NULL TAOS_DEF_ERROR_CODE(0, 0x119C) //"tags not find" -#define TSDB_CODE_HTTP_OP_TAGS_SIZE_0 TAOS_DEF_ERROR_CODE(0, 0x119D) //"tags size is 0" -#define TSDB_CODE_HTTP_OP_TAGS_SIZE_LONG TAOS_DEF_ERROR_CODE(0, 0x119E) //"tags size too long" -#define TSDB_CODE_HTTP_OP_TAG_NULL TAOS_DEF_ERROR_CODE(0, 0x119F) //"tag is null" -#define TSDB_CODE_HTTP_OP_TAG_NAME_NULL TAOS_DEF_ERROR_CODE(0, 0x11A0) //"tag name is null" -#define TSDB_CODE_HTTP_OP_TAG_NAME_SIZE TAOS_DEF_ERROR_CODE(0, 0x11A1) //"tag name length too long" -#define TSDB_CODE_HTTP_OP_TAG_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A2) //"tag value type should be boolean number or string" -#define TSDB_CODE_HTTP_OP_TAG_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A3) //"tag value is null" -#define TSDB_CODE_HTTP_OP_TAG_VALUE_TOO_LONG TAOS_DEF_ERROR_CODE(0, 0x11A4) //"tag value can not more than 64" -#define TSDB_CODE_HTTP_OP_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x11A5) //"value not find" -#define TSDB_CODE_HTTP_OP_VALUE_TYPE TAOS_DEF_ERROR_CODE(0, 0x11A6) //"value type should be boolean number or string" - -#define TSDB_CODE_HTTP_REQUEST_JSON_ERROR TAOS_DEF_ERROR_CODE(0, 0x1F00) //"http request json error" - // tfs #define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 8883431ca8..3e3850de1a 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -46,13 +46,14 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); pMgmt->errCode = cbMeta.code; - mDebug("trans:%d, is proposed, saved:%d code:0x%x, index:%" PRId64 " term:%" PRId64 " role:%s raw:%p", transId, - pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, syncStr(cbMeta.state), pRaw); + mDebug("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 + " role:%s raw:%p", + transId, pMgmt->transId, cbMeta.code, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex, syncStr(cbMeta.state), + pRaw); if (pMgmt->errCode == 0) { sdbWriteWithoutFree(pMnode->pSdb, pRaw); - sdbSetApplyIndex(pMnode->pSdb, cbMeta.index); - sdbSetApplyTerm(pMnode->pSdb, cbMeta.term); + sdbSetApplyInfo(pMnode->pSdb, cbMeta.index, cbMeta.term, cbMeta.lastConfigIndex); } if (pMgmt->transId == transId) { @@ -68,36 +69,19 @@ void mndSyncCommitMsg(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbM mndReleaseTrans(pMnode, pTrans); } - if (cbMeta.index - sdbGetApplyIndex(pMnode->pSdb) > 100) { - SSnapshotMeta sMeta = {0}; - // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { - if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, cbMeta.index, &sMeta) == 0) { - sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); - } - sdbWriteFile(pMnode->pSdb); - } + sdbWriteFile(pMnode->pSdb, SDB_WRITE_DELTA); } } int32_t mndSyncGetSnapshot(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) { SMnode *pMnode = pFsm->data; - pSnapshot->lastApplyIndex = sdbGetCommitIndex(pMnode->pSdb); - pSnapshot->lastApplyTerm = sdbGetCommitTerm(pMnode->pSdb); - pSnapshot->lastConfigIndex = sdbGetCurConfig(pMnode->pSdb); + sdbGetCommitInfo(pMnode->pSdb, &pSnapshot->lastApplyIndex, &pSnapshot->lastApplyTerm, &pSnapshot->lastConfigIndex); return 0; } void mndRestoreFinish(struct SSyncFSM *pFsm) { SMnode *pMnode = pFsm->data; - SSnapshotMeta sMeta = {0}; - // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { - - SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); - if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { - sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); - } - if (!pMnode->deploy) { mInfo("mnode sync restore finished, and will handle outstanding transactions"); mndTransPullup(pMnode); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index c37e706793..31a955b030 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -22,8 +22,8 @@ #include "mndSync.h" #include "mndUser.h" -#define TRANS_VER_NUMBER 1 -#define TRANS_ARRAY_SIZE 8 +#define TRANS_VER_NUMBER 1 +#define TRANS_ARRAY_SIZE 8 #define TRANS_RESERVE_SIZE 64 static SSdbRaw *mndTransActionEncode(STrans *pTrans); @@ -52,7 +52,7 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerfromFinishedStage(SMnode *pMnode, STrans *pTrans); -static bool mndCantExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); } +static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsMaster(pMnode); } static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static int32_t mndProcessTransReq(SRpcMsg *pReq); @@ -523,9 +523,10 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { } if (pOld->stage == TRN_STAGE_ROLLBACK) { - pOld->stage = TRN_STAGE_FINISHED; - mTrace("trans:%d, stage from rollback to finished since perform update action", pNew->id); + pOld->stage = TRN_STAGE_REDO_ACTION; + mTrace("trans:%d, stage from rollback to undoAction since perform update action", pNew->id); } + return 0; } @@ -937,7 +938,7 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { if (pAction->msgSent) return 0; - if (mndCantExecuteTransAction(pMnode)) return -1; + if (mndCannotExecuteTransAction(pMnode)) return -1; int64_t signature = pTrans->id; signature = (signature << 32); @@ -1137,7 +1138,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) pTrans->lastEpset = pAction->epSet; } - if (mndCantExecuteTransAction(pMnode)) break; + if (mndCannotExecuteTransAction(pMnode)) break; if (code == 0) { pTrans->code = 0; @@ -1180,7 +1181,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { code = mndTransExecuteRedoActions(pMnode, pTrans); } - if (mndCantExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode)) return false; if (code == 0) { pTrans->code = 0; @@ -1193,8 +1194,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { } else { pTrans->code = terrno; if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_UNDO_ACTION; - mError("trans:%d, stage from redoAction to undoAction since %s", pTrans->id, terrstr()); + pTrans->stage = TRN_STAGE_ROLLBACK; + mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr()); continueExec = true; } else { pTrans->failedTimes++; @@ -1207,7 +1208,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { } static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { - if (mndCantExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode)) return false; bool continueExec = true; int32_t code = mndTransCommit(pMnode, pTrans); @@ -1219,16 +1220,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { continueExec = true; } else { pTrans->code = terrno; - if (pTrans->policy == TRN_POLICY_ROLLBACK) { - pTrans->stage = TRN_STAGE_UNDO_ACTION; - mError("trans:%d, stage from commit to undoAction since %s, failedTimes:%d", pTrans->id, terrstr(), - pTrans->failedTimes); - continueExec = true; - } else { - pTrans->failedTimes++; - mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); - continueExec = false; - } + pTrans->failedTimes++; + mError("trans:%d, stage keep on commit since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes); + continueExec = false; } return continueExec; @@ -1257,11 +1251,9 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { bool continueExec = true; int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); - if (mndCantExecuteTransAction(pMnode)) return false; - if (code == 0) { - pTrans->stage = TRN_STAGE_ROLLBACK; - mDebug("trans:%d, stage from undoAction to rollback", pTrans->id); + pTrans->stage = TRN_STAGE_FINISHED; + mDebug("trans:%d, stage from undoAction to finished", pTrans->id); continueExec = true; } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mDebug("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); @@ -1276,14 +1268,14 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { } static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { - if (mndCantExecuteTransAction(pMnode)) return false; + if (mndCannotExecuteTransAction(pMnode)) return false; bool continueExec = true; int32_t code = mndTransRollback(pMnode, pTrans); if (code == 0) { - pTrans->stage = TRN_STAGE_FINISHED; - mDebug("trans:%d, stage from rollback to finished", pTrans->id); + pTrans->stage = TRN_STAGE_UNDO_ACTION; + mDebug("trans:%d, stage from rollback to undoAction", pTrans->id); continueExec = true; } else { pTrans->failedTimes++; @@ -1331,12 +1323,12 @@ void mndTransExecute(SMnode *pMnode, STrans *pTrans) { case TRN_STAGE_COMMIT_ACTION: continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); break; - case TRN_STAGE_UNDO_ACTION: - continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); - break; case TRN_STAGE_ROLLBACK: continueExec = mndTransPerformRollbackStage(pMnode, pTrans); break; + case TRN_STAGE_UNDO_ACTION: + continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); + break; case TRN_STAGE_FINISHED: continueExec = mndTransPerfromFinishedStage(pMnode, pTrans); break; @@ -1438,13 +1430,8 @@ void mndTransPullup(SMnode *pMnode) { mndReleaseTrans(pMnode, pTrans); } - SSnapshotMeta sMeta = {0}; - // if (syncGetSnapshotMeta(pMnode->syncMgmt.sync, &sMeta) == 0) { - SyncIndex snapshotIndex = sdbGetApplyIndex(pMnode->pSdb); - if (syncGetSnapshotMetaByIndex(pMnode->syncMgmt.sync, snapshotIndex, &sMeta) == 0) { - sdbSetCurConfig(pMnode->pSdb, sMeta.lastConfigIndex); - } - sdbWriteFile(pMnode->pSdb); + // todo, set to SDB_WRITE_DELTA + sdbWriteFile(pMnode->pSdb, 0); taosArrayDestroy(pArray); } diff --git a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp index 43be55dd1d..bc118ee26e 100644 --- a/source/dnode/mnode/impl/test/sdb/sdbTest.cpp +++ b/source/dnode/mnode/impl/test/sdb/sdbTest.cpp @@ -493,8 +493,11 @@ TEST_F(MndTestSdb, 01_Write_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 2); - sdbSetApplyIndex(pSdb, -1); - ASSERT_EQ(sdbGetApplyIndex(pSdb), -1); + sdbSetApplyInfo(pSdb, -1, -1, -1); + // int64_t index, config; + // int64_t term; + // sdbGetCommitInfo(pSdb, &index, &term, &config); + // ASSERT_EQ(index, -1); ASSERT_EQ(mnode.insertTimes, 2); ASSERT_EQ(mnode.deleteTimes, 0); @@ -700,11 +703,12 @@ TEST_F(MndTestSdb, 01_Write_Str) { } // write version - sdbSetApplyIndex(pSdb, 0); - sdbSetApplyIndex(pSdb, 1); - ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); - ASSERT_EQ(sdbWriteFile(pSdb), 0); - ASSERT_EQ(sdbWriteFile(pSdb), 0); + sdbSetApplyInfo(pSdb, 0, 0, 0); + sdbSetApplyInfo(pSdb, 1, 0, 0); + // sdbGetApplyInfo(pSdb, &index, &term, &config); + // ASSERT_EQ(index, 1); + ASSERT_EQ(sdbWriteFile(pSdb, 0), 0); + ASSERT_EQ(sdbWriteFile(pSdb, 0), 0); sdbCleanup(pSdb); ASSERT_EQ(mnode.insertTimes, 7); @@ -772,7 +776,11 @@ TEST_F(MndTestSdb, 01_Read_Str) { ASSERT_EQ(sdbGetSize(pSdb, SDB_USER), 2); ASSERT_EQ(sdbGetMaxId(pSdb, SDB_USER), -1); ASSERT_EQ(sdbGetTableVer(pSdb, SDB_USER), 5); - ASSERT_EQ(sdbGetApplyIndex(pSdb), 1); + + int64_t index, config; + int64_t term; + sdbGetCommitInfo(pSdb, &index, &term, &config); + ASSERT_EQ(index, 1); ASSERT_EQ(mnode.insertTimes, 4); ASSERT_EQ(mnode.deleteTimes, 0); diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index ad1bf584d0..1bd09aef63 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -37,6 +37,8 @@ extern "C" { #define mTrace(...) { if (mDebugFlag & DEBUG_TRACE) { taosPrintLog("MND ", DEBUG_TRACE, mDebugFlag, __VA_ARGS__); }} // clang-format on +#define SDB_WRITE_DELTA 100 + #define SDB_GET_VAL(pData, dataPos, val, pos, func, type) \ { \ if (func(pRaw, dataPos, val) != 0) { \ @@ -258,7 +260,7 @@ int32_t sdbReadFile(SSdb *pSdb); * @param pSdb The sdb object. * @return int32_t 0 for success, -1 for failure. */ -int32_t sdbWriteFile(SSdb *pSdb); +int32_t sdbWriteFile(SSdb *pSdb, int32_t delta); /** * @brief Parse and write raw data to sdb, then free the pRaw object @@ -362,14 +364,8 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type); * @param index The update value of the apply index. * @return int32_t The current index of sdb */ -void sdbSetApplyIndex(SSdb *pSdb, int64_t index); -void sdbSetApplyTerm(SSdb *pSdb, int64_t term); -void sdbSetCurConfig(SSdb *pSdb, int64_t config); -int64_t sdbGetApplyIndex(SSdb *pSdb); -int64_t sdbGetApplyTerm(SSdb *pSdb); -int64_t sdbGetCommitIndex(SSdb *pSdb); -int64_t sdbGetCommitTerm(SSdb *pSdb); -int64_t sdbGetCurConfig(SSdb *pSdb); +void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config); +void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 61809aa93b..c44f1670c3 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -68,7 +68,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); - sdbWriteFile(pSdb); + sdbWriteFile(pSdb, 0); if (pSdb->currDir != NULL) { taosMemoryFreeClear(pSdb->currDir); @@ -160,23 +160,20 @@ static int32_t sdbCreateDir(SSdb *pSdb) { return 0; } -void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; } - -void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; } - -void sdbSetCurConfig(SSdb *pSdb, int64_t config) { - if (pSdb->applyConfig != config) { - mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config); - pSdb->applyConfig = config; - } +void sdbSetApplyInfo(SSdb *pSdb, int64_t index, int64_t term, int64_t config) { + mTrace("mnode apply info changed, from index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", to index:%" PRId64 + " term:%" PRId64 " config:%" PRId64, + pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, index, term, config); + pSdb->applyIndex = index; + pSdb->applyTerm = term; + pSdb->applyConfig = config; } -int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; } - -int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; } - -int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; } - -int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; } - -int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; } \ No newline at end of file +void sdbGetCommitInfo(SSdb *pSdb, int64_t *index, int64_t *term, int64_t *config) { + *index = pSdb->commitIndex; + *term = pSdb->commitTerm; + *config = pSdb->commitConfig; + mTrace("mnode current info, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64 + " term:%" PRId64 " config:%" PRId64, + pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, *index, *term, *config); +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 2e8e932572..0f4e1276c1 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -445,12 +445,16 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) { return code; } -int32_t sdbWriteFile(SSdb *pSdb) { +int32_t sdbWriteFile(SSdb *pSdb, int32_t delta) { int32_t code = 0; if (pSdb->applyIndex == pSdb->commitIndex) { return 0; } + if (pSdb->applyIndex - pSdb->commitIndex < delta) { + return 0; + } + taosThreadMutexLock(&pSdb->filelock); if (pSdb->pWal != NULL) { code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex); @@ -475,7 +479,7 @@ int32_t sdbDeploy(SSdb *pSdb) { return -1; } - if (sdbWriteFile(pSdb) != 0) { + if (sdbWriteFile(pSdb, 0) != 0) { return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index fded723f1a..ea3494594e 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -180,8 +180,8 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { for (int32_t i = 0; i < numOfMsgs; ++i) { if (taosGetQitem(qall, (void **)&pMsg) == 0) continue; - vTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType), - pMsg->info.handle); + vTrace("vgId:%d, msg:%p get from vnode-apply queue, index:%" PRId64 " type:%s handle:%p", vgId, pMsg, + pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle); SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; if (rsp.code == 0) { @@ -334,8 +334,8 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); rpcMsg.info.conn.applyIndex = cbMeta.index; - vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode), - TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle); + vInfo("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " index:%" PRId64 " handle:%p", + TD_VID(pVnode), TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, cbMeta.index, rpcMsg.info.handle); if (rpcMsg.info.handle != NULL) { tmsgSendRsp(&rpcMsg); } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index b00c7cbda1..b6225c79cd 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -171,7 +171,8 @@ void syncNodeClose(SSyncNode* pSyncNode); int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak); // option -bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); +bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); +SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex); // ping -------------- int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index cd64402738..9969a0b974 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -47,14 +47,15 @@ typedef struct SRaftCfg { SRaftCfg *raftCfgOpen(const char *path); int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); +int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); -char * syncCfg2Str(SSyncCfg *pSyncCfg); +cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); +char *syncCfg2Str(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); -char * raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); +char *raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index a8cb3d0e69..2c17d0f3ed 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -208,8 +208,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pRollBackEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pRollBackEntry->isWeak; cbMeta.code = 0; cbMeta.state = ths->state; @@ -234,8 +235,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm != NULL) { // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pAppendEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pAppendEntry->isWeak; cbMeta.code = 2; cbMeta.state = ths->state; @@ -266,8 +268,9 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (ths->pFsm != NULL) { // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pAppendEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pAppendEntry->isWeak; cbMeta.code = 3; cbMeta.state = ths->state; @@ -696,8 +699,9 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg); - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pRollBackEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pRollBackEntry->isWeak; cbMeta.code = 0; cbMeta.state = ths->state; @@ -725,8 +729,9 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 2; cbMeta.state = ths->state; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 2b06673a62..5abe98b425 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -444,6 +444,21 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct return 0; } +SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { + ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); + SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; + + for (int i = 0; i < pSyncNode->pRaftCfg->configIndexCount; ++i) { + if ((pSyncNode->pRaftCfg->configIndexArr)[i] > lastIndex && + (pSyncNode->pRaftCfg->configIndexArr)[i] <= snapshotLastApplyIndex) { + lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[i]; + } + } + + sTrace("sync syncNodeGetSnapshotConfigIndex index:%ld lastConfigIndex:%ld", snapshotLastApplyIndex, lastIndex); + return lastIndex; +} + const char* syncGetMyRoleStr(int64_t rid) { const char* s = syncUtilState2String(syncGetMyRole(rid)); return s; @@ -2068,8 +2083,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { if (ths->pFsm != NULL) { // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 0; cbMeta.state = ths->state; @@ -2090,8 +2106,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { if (ths->pFsm != NULL) { // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 1; cbMeta.state = ths->state; @@ -2155,11 +2172,12 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE } */ if (ths->pFsm->FpLeaderTransferCb != NULL) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.flag = 0; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; cbMeta.seqNum = pEntry->seqNum; cbMeta.state = ths->state; @@ -2261,6 +2279,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, pEntry->index); cbMeta.term = pEntry->term; cbMeta.newCfg = newSyncCfg; cbMeta.oldCfg = oldSyncCfg; @@ -2295,8 +2314,9 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // user commit if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta; + SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; + cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; cbMeta.code = 0; cbMeta.state = ths->state; @@ -2310,6 +2330,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, // config change if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) { + raftCfgAddConfigIndex(ths->pRaftCfg, pEntry->index); + raftCfgPersist(ths->pRaftCfg); code = syncNodeConfigChange(ths, &rpcMsg, pEntry); ASSERT(code == 0); } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 2d51f1f6f0..8831704d7c 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -66,6 +66,13 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { return 0; } +int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex) { + ASSERT(pRaftCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT); + (pRaftCfg->configIndexArr)[pRaftCfg->configIndexCount] = configIndex; + ++(pRaftCfg->configIndexCount); + return 0; +} + cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char u64buf[128] = {0}; cJSON *pRoot = cJSON_CreateObject(); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 7c8abfe494..ba796c2aff 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -50,6 +50,7 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI } else { sError("snapshotSenderCreate cannot create sender"); } + return pSender; } @@ -84,6 +85,10 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { // get current snapshot info pSender->pSyncNode->pFsm->FpGetSnapshot(pSender->pSyncNode->pFsm, &(pSender->snapshot)); + + sTrace("snapshotSenderStart lastApplyIndex:%ld, lastApplyTerm:%lu, lastConfigIndex:%ld", + pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex); + if (pSender->snapshot.lastConfigIndex != SYNC_INDEX_INVALID) { /* SSyncRaftEntry *pEntry = NULL; @@ -421,7 +426,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -542,7 +547,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -566,7 +571,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 78cf63de92..b5004d41ce 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -108,7 +108,7 @@ python3 ./test.py -f 2-query/distribute_agg_apercentile.py python3 ./test.py -f 6-cluster/5dnode1mnode.py python3 ./test.py -f 6-cluster/5dnode2mnode.py #python3 ./test.py -f 6-cluster/5dnode3mnodeStop.py -python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py +#python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py # BUG python3 ./test.py -f 6-cluster/5dnode3mnodeStopInsert.py python3 ./test.py -f 7-tmq/basic5.py