From e1c03118abe8142ab48b15d8ddc8cfd20c4aa477 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 15 Sep 2023 19:58:48 +0800 Subject: [PATCH] enh: extract snapshot info for both of snap reader and writer --- include/libs/sync/sync.h | 6 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 106 +++++++++++++++------ source/dnode/vnode/src/vnd/vnodeSync.c | 5 +- source/libs/sync/src/syncSnapshot.c | 24 ++++- 5 files changed, 108 insertions(+), 35 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index cc381bb54e..50e60d2ef4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -151,12 +151,12 @@ typedef struct SReConfigCbMeta { typedef struct SSnapshotParam { SyncIndex start; SyncIndex end; - void* data; // with SMsgHead + SSyncTLV* data; } SSnapshotParam; typedef struct SSnapshot { - ESyncSnapInfoTyp typ; - void* data; // with SMsgHead + int32_t typ; + SSyncTLV* data; SyncIndex lastApplyIndex; SyncTerm lastApplyTerm; SyncIndex lastConfigIndex; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index baff19e3d4..a120ecf9db 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -263,7 +263,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader void vnodeSnapReaderClose(SVSnapReader *pReader); int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData); // SVSnapWriter -int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter); +int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter); int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *pSnapshot); int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData); diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index c56644ca2c..cf04471be0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -49,6 +49,26 @@ struct SVSnapReader { SRSmaSnapReader *pRsmaReader; }; +static int32_t vnodeExtractSnapInfoDiff(void *buf, int32_t bufLen, TSnapRangeArray **ppRanges) { + int32_t code = -1; + STsdbSnapPartList *pList = tsdbSnapPartListCreate(); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto _out; + } + if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + goto _out; + } + if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) { + goto _out; + } + code = 0; +_out: + tsdbSnapPartListDestroy(&pList); + return code; +} + int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader **ppReader) { int32_t code = 0; int64_t sver = pParam->start; @@ -64,6 +84,7 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader pReader->sver = sver; pReader->ever = ever; + // snapshot info if (pParam->data) { SSyncTLV *datHead = (void *)pParam->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { @@ -72,37 +93,29 @@ int32_t vnodeSnapReaderOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapReader } int32_t offset = 0; + TSnapRangeArray **ppRanges = NULL; + while (offset + sizeof(SSyncTLV) < datHead->len) { - SSyncTLV *sectHead = (void *)(datHead->val + offset); - offset += sizeof(SSyncTLV) + sectHead->len; - void *buf = sectHead->val; - int32_t bufLen = sectHead->len; - ASSERT(sectHead->typ == SNAP_DATA_TSDB || sectHead->typ == SNAP_DATA_RSMA1); - STsdbSnapPartList *pList = tsdbSnapPartListCreate(); - if (pList == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + SSyncTLV *subField = (void *)(datHead->val + offset); + offset += sizeof(SSyncTLV) + subField->len; + void *buf = subField->val; + int32_t bufLen = subField->len; + switch (subField->typ) { + case SNAP_DATA_TSDB: + ppRanges = &pReader->pRanges; + break; + case SNAP_DATA_RSMA1: + ppRanges = &pReader->pRsmaRanges; + break; + default: + vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), + subField->typ); + goto _err; } - if (tDeserializeTsdbSnapPartList(buf, bufLen, pList) < 0) { - terrno = TSDB_CODE_INVALID_DATA_FMT; - goto _err; - } - TSnapRangeArray **ppRanges = NULL; - if (sectHead->typ == SNAP_DATA_TSDB) { - ppRanges = &pReader->pRanges; - } else if (sectHead->typ == SNAP_DATA_RSMA1) { - ppRanges = &pReader->pRsmaRanges; - } - if (ppRanges == NULL) { - tsdbSnapPartListDestroy(&pList); - continue; - } - if (tsdbSnapPartListToRangeDiff(pList, ppRanges) < 0) { + if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); - tsdbSnapPartListDestroy(&pList); goto _err; } - tsdbSnapPartListDestroy(&pList); } } @@ -414,6 +427,7 @@ struct SVSnapWriter { // meta SMetaSnapWriter *pMetaSnapWriter; // tsdb + TSnapRangeArray *pRanges; STsdbSnapWriter *pTsdbSnapWriter; // tq STqSnapWriter *pTqSnapWriter; @@ -423,12 +437,15 @@ struct SVSnapWriter { SStreamTaskWriter *pStreamTaskWriter; SStreamStateWriter *pStreamStateWriter; // rsma + TSnapRangeArray *pRsmaRanges; SRSmaSnapWriter *pRsmaSnapWriter; }; -int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWriter **ppWriter) { +int32_t vnodeSnapWriterOpen(SVnode *pVnode, SSnapshotParam *pParam, SVSnapWriter **ppWriter) { int32_t code = 0; SVSnapWriter *pWriter = NULL; + int64_t sver = pParam->start; + int64_t ever = pParam->end; // commit memory data vnodeAsyncCommit(pVnode); @@ -447,6 +464,41 @@ int32_t vnodeSnapWriterOpen(SVnode *pVnode, int64_t sver, int64_t ever, SVSnapWr // inc commit ID pWriter->commitID = ++pVnode->state.commitID; + // snapshot info + if (pParam->data) { + SSyncTLV *datHead = (void *)pParam->data; + if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { + terrno = TSDB_CODE_INVALID_DATA_FMT; + goto _err; + } + + int32_t offset = 0; + TSnapRangeArray **ppRanges = NULL; + + while (offset + sizeof(SSyncTLV) < datHead->len) { + SSyncTLV *subField = (void *)(datHead->val + offset); + offset += sizeof(SSyncTLV) + subField->len; + void *buf = subField->val; + int32_t bufLen = subField->len; + switch (subField->typ) { + case SNAP_DATA_TSDB: + ppRanges = &pWriter->pRanges; + break; + case SNAP_DATA_RSMA1: + ppRanges = &pWriter->pRsmaRanges; + break; + default: + vError("vgId:%d, unexpected subfield type in data of snapshot param. subtyp:%d", TD_VID(pVnode), + subField->typ); + goto _err; + } + if (vnodeExtractSnapInfoDiff(buf, bufLen, ppRanges) < 0) { + vError("vgId:%d, failed to get range diff since %s", TD_VID(pVnode), terrstr()); + goto _err; + } + } + } + vInfo("vgId:%d, vnode snapshot writer opened, sver:%" PRId64 " ever:%" PRId64 " commit id:%" PRId64, TD_VID(pVnode), sver, ever, pWriter->commitID); *ppWriter = pWriter; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e676219b11..b73c9b8c65 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -491,8 +491,7 @@ static int32_t vnodeSnapshotDoRead(const SSyncFSM *pFsm, void *pReader, void **p } static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void **ppWriter) { - SVnode *pVnode = pFsm->data; - SSnapshotParam *pSnapshotParam = pParam; + SVnode *pVnode = pFsm->data; do { int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE); @@ -505,7 +504,7 @@ static int32_t vnodeSnapshotStartWrite(const SSyncFSM *pFsm, void *pParam, void } } while (true); - int32_t code = vnodeSnapWriterOpen(pVnode, pSnapshotParam->start, pSnapshotParam->end, (SVSnapWriter **)ppWriter); + int32_t code = vnodeSnapWriterOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapWriter **)ppWriter); return code; } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 681d256ec9..99e8fd55a2 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -368,6 +368,17 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { pReceiver->pWriter = NULL; } + // free data of snapshot info + if (pReceiver->snapshotParam.data) { + taosMemoryFree(pReceiver->snapshotParam.data); + pReceiver->snapshotParam.data = NULL; + } + + if (pReceiver->snapshot.data) { + taosMemoryFree(pReceiver->snapshot.data); + pReceiver->snapshot.data = NULL; + } + // free receiver taosMemoryFree(pReceiver); } @@ -652,6 +663,17 @@ _SEND_REPLY: if (snapInfo.data) { pRspMsg->payloadType = snapInfo.typ; memcpy(pRspMsg->data, snapInfo.data, dataLen); + + // save snapshot info + SSnapshotParam *pParam = &pReceiver->snapshotParam; + void *data = taosMemoryRealloc(pParam->data, dataLen); + if (data == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = terrno; + goto _out; + } + pParam->data = data; + memcpy(pParam->data, snapInfo.data, dataLen); } // send msg @@ -941,7 +963,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend terrno = TSDB_CODE_INVALID_DATA_FMT; return -1; } - pSender->snapshotParam.data = pMsg->data; + pSender->snapshotParam.data = (void *)pMsg->data; } int32_t code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader);