Merge pull request #15031 from taosdata/feature/3.0_mhli
refactor(sync): add index/term in vnodeSnapWriterClose
This commit is contained in:
commit
046e784c8c
|
@ -134,7 +134,7 @@ typedef struct SSyncFSM {
|
|||
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
|
||||
|
||||
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
|
||||
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
|
||||
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot* pSnapshot);
|
||||
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
|
||||
|
||||
} SSyncFSM;
|
||||
|
|
|
@ -143,7 +143,7 @@ int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWrit
|
|||
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
|
||||
}
|
||||
|
||||
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
|
||||
int32_t mndSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
|
||||
mInfo("stop to apply snapshot to sdb, apply:%d", isApply);
|
||||
SMnode *pMnode = pFsm->data;
|
||||
return sdbStopWrite(pMnode->pSdb, pWriter, isApply);
|
||||
|
|
|
@ -116,11 +116,11 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
|
|||
// typedef struct STsdb STsdb;
|
||||
typedef struct STsdbReader STsdbReader;
|
||||
|
||||
#define BLOCK_LOAD_OFFSET_ORDER 1
|
||||
#define BLOCK_LOAD_OFFSET_ORDER 1
|
||||
#define BLOCK_LOAD_TABLESEQ_ORDER 2
|
||||
#define BLOCK_LOAD_EXTERN_ORDER 3
|
||||
#define BLOCK_LOAD_EXTERN_ORDER 3
|
||||
|
||||
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
|
||||
#define LASTROW_RETRIEVE_TYPE_ALL 0x1
|
||||
#define LASTROW_RETRIEVE_TYPE_SINGLE 0x2
|
||||
|
||||
int32_t tsdbSetTableId(STsdbReader *pReader, int64_t uid);
|
||||
|
@ -191,7 +191,7 @@ int32_t vnodeSnapReaderClose(SVSnapReader *pReader);
|
|||
int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData);
|
||||
// SVSnapWriter
|
||||
int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter);
|
||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback);
|
||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot);
|
||||
int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData);
|
||||
|
||||
// structs
|
||||
|
@ -237,8 +237,8 @@ typedef struct {
|
|||
uint64_t groupId;
|
||||
} STableKeyInfo;
|
||||
|
||||
#define TABLE_ROLLUP_ON ((int8_t)0x1)
|
||||
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
|
||||
#define TABLE_ROLLUP_ON ((int8_t)0x1)
|
||||
#define TABLE_IS_ROLLUP(FLG) (((FLG) & (TABLE_ROLLUP_ON)) != 0)
|
||||
#define TABLE_SET_ROLLUP(FLG) ((FLG) |= TABLE_ROLLUP_ON)
|
||||
struct SMetaEntry {
|
||||
int64_t version;
|
||||
|
|
|
@ -173,7 +173,7 @@ _err:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback) {
|
||||
int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot) {
|
||||
int32_t code = 0;
|
||||
SVnode *pVnode = pWriter->pVnode;
|
||||
|
||||
|
|
|
@ -515,10 +515,10 @@ static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void
|
|||
#endif
|
||||
}
|
||||
|
||||
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
|
||||
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply, SSnapshot *pSnapshot) {
|
||||
#ifdef USE_TSDB_SNAPSHOT
|
||||
SVnode *pVnode = pFsm->data;
|
||||
int32_t code = vnodeSnapWriterClose(pWriter, !isApply);
|
||||
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
|
||||
return code;
|
||||
#else
|
||||
taosMemoryFree(pWriter);
|
||||
|
|
|
@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
|||
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||
cJSON *pJson = snapshotSender2Json(pSender);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
||||
int32_t len = 256;
|
||||
char * s = taosMemoryMalloc(len);
|
||||
char *s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
char host[64];
|
||||
|
@ -434,8 +434,8 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
|||
if (pReceiver != NULL) {
|
||||
// close writer
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
int32_t ret =
|
||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter,
|
||||
false, &(pReceiver->snapshot));
|
||||
ASSERT(ret == 0);
|
||||
pReceiver->pWriter = NULL;
|
||||
}
|
||||
|
@ -483,8 +483,8 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncSnapsh
|
|||
static void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver) {
|
||||
// force close, abandon incomplete data
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
int32_t ret =
|
||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
|
||||
&(pReceiver->snapshot));
|
||||
ASSERT(ret == 0);
|
||||
pReceiver->pWriter = NULL;
|
||||
}
|
||||
|
@ -524,8 +524,8 @@ int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend
|
|||
// FpSnapshotStopWrite should not be called, assert writer == NULL
|
||||
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) {
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
int32_t ret =
|
||||
pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false);
|
||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false,
|
||||
&(pReceiver->snapshot));
|
||||
ASSERT(ret == 0);
|
||||
pReceiver->pWriter = NULL;
|
||||
}
|
||||
|
@ -574,7 +574,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
|
|||
}
|
||||
|
||||
// stop writer, apply data
|
||||
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true);
|
||||
code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true,
|
||||
&(pReceiver->snapshot));
|
||||
if (code != 0) {
|
||||
syncNodeErrorLog(pReceiver->pSyncNode, "snapshot stop writer true error");
|
||||
ASSERT(0);
|
||||
|
@ -646,7 +647,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pReceiver->fromId.addr;
|
||||
cJSON * pTmp = pFromId;
|
||||
cJSON *pTmp = pFromId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
|
@ -679,14 +680,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
||||
int32_t len = 256;
|
||||
char * s = taosMemoryMalloc(len);
|
||||
char *s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId fromId = pReceiver->fromId;
|
||||
char host[128];
|
||||
|
|
|
@ -125,7 +125,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStopWrite== pFsm:%p, pWriter:%p, isApply:%d", pFsm, pWriter,
|
||||
isApply);
|
||||
|
|
|
@ -30,7 +30,7 @@ int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
|||
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
||||
|
||||
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) { return 0; }
|
||||
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
|
||||
|
||||
SSyncSnapshotReceiver* createReceiver() {
|
||||
|
|
|
@ -126,7 +126,7 @@ int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter)
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) {
|
||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply, SSnapshot *pSnapshot) {
|
||||
if (isApply) {
|
||||
gSnapshotLastApplyIndex = gFinishLastApplyIndex;
|
||||
gSnapshotLastApplyTerm = gFinishLastApplyTerm;
|
||||
|
|
Loading…
Reference in New Issue