diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c226d7c8cc..06a58af0e3 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -134,7 +134,7 @@ typedef struct SSyncFSM { int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len); int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter); - int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply); + int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot); int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len); } SSyncFSM; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 2811aeb43a..36362bed87 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -143,7 +143,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter); } -int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { +int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { mInfo("stop to apply snapshot to sdb, apply:%d", isApply); SMnode *pMnode = pFsm->data; return sdbStopWrite(pMnode->pSdb, pWriter, isApply); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6b5f795eb0..57141bbbb5 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -116,11 +116,11 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur); // typedef struct STsdb STsdb; typedef struct STsdbReader STsdbReader; -#define BLOCK_LOAD_OFFSET_ORDER 1 +#define BLOCK_LOAD_OFFSET_ORDER 1 #define BLOCK_LOAD_TABLESEQ_ORDER 2 -#define BLOCK_LOAD_EXTERN_ORDER 3 +#define BLOCK_LOAD_EXTERN_ORDER 3 -#define LASTROW_RETRIEVE_TYPE_ALL 0x1 +#define LASTROW_RETRIEVE_TYPE_ALL 0x1 #define LASTROW_RETRIEVE_TYPE_SINGLE 0x2 int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid); @@ -191,7 +191,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader); int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData); // SVSnapWriter int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter); -int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback); +int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); // structs @@ -237,8 +237,8 @@ typedef struct { uint64_t groupId; } STableKeyInfo; -#define TABLE_ROLLUP_ON ((int8_t)0x1) -#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0) +#define TABLE_ROLLUP_ON ((int8_t)0x1) +#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0) #define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON) struct SMetaEntry { int64_t version; diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 420cf3c473..c6b423f712 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -173,7 +173,7 @@ _err: return code; } -int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) { +int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) { int32_t code = 0; SVnode *pVnode = pWriter->pVnode; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 81177fd5d8..712cee9fd0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -515,10 +515,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void #endif } -static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { +static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) { #ifdef USE_TSDB_SNAPSHOT SVnode *pVnode = pFsm->data; - int32_t code = vnodeSnapWriterClose(pWriter, !isApply); + int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot); return code; #else taosMemoryFree(pWriter); diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 87cc5685f3..924a4df90d 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) { cJSON *pJson = snapshotSender2Json(pSender); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; char host[64]; @@ -434,8 +434,8 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { if (pReceiver != NULL) { // close writer if (pReceiver->pWriter != NULL) { - int32_t ret = - pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + false, &(pReceiver->snapshot)); ASSERT(ret == 0); pReceiver->pWriter = NULL; } @@ -483,8 +483,8 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) { // force close, abandon incomplete data if (pReceiver->pWriter != NULL) { - int32_t ret = - pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, + &(pReceiver->snapshot)); ASSERT(ret == 0); pReceiver->pWriter = NULL; } @@ -524,8 +524,8 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend // FpSnapshotStopWrite should not be called, assert writer == NULL int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { if (pReceiver->pWriter != NULL) { - int32_t ret = - pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false); + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, + &(pReceiver->snapshot)); ASSERT(ret == 0); pReceiver->pWriter = NULL; } @@ -574,7 +574,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap } // stop writer, apply data - code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true); + code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, + &(pReceiver->snapshot)); if (code != 0) { syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error"); ASSERT(0); @@ -646,7 +647,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { cJSON_AddStringToObject(pFromId, "addr", u64buf); { uint64_t u64 = pReceiver->fromId.addr; - cJSON * pTmp = pFromId; + cJSON *pTmp = pFromId; char host[128] = {0}; uint16_t port; syncUtilU642Addr(u64, host, sizeof(host), &port); @@ -679,14 +680,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { cJSON *pJson = snapshotReceiver2Json(pReceiver); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { int32_t len = 256; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); SRaftId fromId = pReceiver->fromId; char host[128]; diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp index 339ebe90e7..8c6c68bbf8 100644 --- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp +++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp @@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { char logBuf[256] = {0}; snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter, isApply); diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index e5d93ddff4..b744843b1e 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } SSyncSnapshotReceiver* createReceiver() { diff --git a/source/libs/sync/test/syncTestTool.cpp b/source/libs/sync/test/syncTestTool.cpp index f35c6f8a2f..b0a561cb89 100644 --- a/source/libs/sync/test/syncTestTool.cpp +++ b/source/libs/sync/test/syncTestTool.cpp @@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) return 0; } -int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { +int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { if (isApply) { gSnapshotLastApplyIndex = gFinishLastApplyIndex; gSnapshotLastApplyTerm = gFinishLastApplyTerm;