From 602d2aa3778d12345cfcc9dd43fa4cf299fe48e6 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 2 Aug 2022 16:13:36 +0800 Subject: [PATCH] other: rsma snapshot and misc update --- include/os/osFile.h | 1 + source/common/src/tdatablock.c | 19 +++ source/dnode/vnode/src/inc/sma.h | 29 +++- source/dnode/vnode/src/inc/vnodeInt.h | 21 +-- source/dnode/vnode/src/sma/smaCommit.c | 35 ++++ source/dnode/vnode/src/sma/smaRollup.c | 34 +++- source/dnode/vnode/src/sma/smaSnapshot.c | 167 +++++++++++++++---- source/dnode/vnode/src/tq/tqOffsetSnapshot.c | 4 +- source/dnode/vnode/src/tq/tqSnapshot.c | 14 +- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 4 +- source/dnode/vnode/src/vnd/vnodeSnapshot.c | 15 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 5 +- source/os/src/osFile.c | 20 +++ 13 files changed, 293 insertions(+), 75 deletions(-) diff --git a/include/os/osFile.h b/include/os/osFile.h index 2f6a6ba480..21e3d2e6cf 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -54,6 +54,7 @@ typedef struct TdFile *TdFilePtr; #define TD_FILE_EXCL 0x0080 #define TD_FILE_STREAM 0x0100 // Only support taosFprintfFile, taosGetLineFile, taosEOFFile TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions); +TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions); #define TD_FILE_ACCESS_EXIST_OK 0x1 #define TD_FILE_ACCESS_READ_OK 0x2 diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2a8f80f030..be216e0742 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1739,6 +1739,9 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { formatTimestamp(pBuf, *(uint64_t*)var, TSDB_TIME_PRECISION_MILLI); printf(" %25s |", pBuf); break; + case TSDB_DATA_TYPE_BOOL: + printf(" %15d |", *(int32_t*)var); + break; case TSDB_DATA_TYPE_INT: printf(" %15d |", *(int32_t*)var); break; @@ -1757,6 +1760,22 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { case TSDB_DATA_TYPE_DOUBLE: printf(" %15lf |", *(double*)var); break; + case TSDB_DATA_TYPE_VARCHAR: { + char* pData = colDataGetVarData(pColInfoData, j); + int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData) + 1); + memset(pBuf, 0, dataSize); + strncpy(pBuf, varDataVal(pData), dataSize); + printf(" %15s |", pBuf); + } break; + case TSDB_DATA_TYPE_NCHAR: { + char* pData = colDataGetVarData(pColInfoData, j); + int32_t dataSize = TMIN(sizeof(pBuf), varDataLen(pData) + 1); + memset(pBuf, 0, dataSize); + taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); + printf(" %15s |", pBuf); + } break; + default: + break; } } printf("\n"); diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c825ab6731..8cd70537de 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -41,6 +41,9 @@ typedef struct SRSmaStat SRSmaStat; typedef struct SSmaKey SSmaKey; typedef struct SRSmaInfo SRSmaInfo; typedef struct SRSmaInfoItem SRSmaInfoItem; +typedef struct SQTaskFile SQTaskFile; +typedef struct SQTaskFReader SQTaskFReader; +typedef struct SQTaskFWriter SQTaskFWriter; struct SSmaEnv { SRWLatch lock; @@ -64,12 +67,32 @@ struct STSmaStat { STSchema *pTSchema; }; +struct SQTaskFile { + volatile int32_t nRef; + int64_t commitID; + int64_t size; +}; + +struct SQTaskFReader { + SSma *pSma; + SQTaskFile fTask; + TdFilePtr pReadH; +}; +struct SQTaskFWriter { + SSma *pSma; + SQTaskFile fTask; + TdFilePtr pWriteH; + char *fname; +}; + struct SRSmaStat { SSma *pSma; int64_t commitAppliedVer; // vnode applied version for async commit int64_t refId; // shared by fetch tasks + SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing + SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) SHashObj *rsmaInfoHash; // key: stbUid, value: SRSmaInfo; SHashObj *iRsmaInfoHash; // key: stbUid, value: SRSmaInfo; immutable rsmaInfoHash }; @@ -89,6 +112,7 @@ struct SSmaStat { #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_COMMIT_STAT(r) (&(r)->commitStat) #define RSMA_REF_ID(r) ((r)->refId) +#define RSMA_FS_LOCK(r) (&(r)->lock) struct SRSmaInfoItem { void *taskInfo; // qTaskInfo_t @@ -192,6 +216,8 @@ static FORCE_INLINE void tdSmaStatSetDropped(STSmaStat *pTStat) { } } +void tdRSmaQTaskInfoGetFileName(int32_t vid, int64_t version, char *outputName); +void tdRSmaQTaskInfoGetFullName(int32_t vid, int64_t version, const char *path, char *outputName); int32_t tdCloneRSmaInfo(SSma *pSma, SRSmaInfo *pDest, SRSmaInfo *pSrc); void tdFreeQTaskInfo(qTaskInfo_t *taskHandle, int32_t vgId, int32_t level); static int32_t tdDestroySmaState(SSmaStat *pSmaStat, int8_t smaType); @@ -209,9 +235,6 @@ int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, // smaFileUtil ================ -typedef struct SQTaskFReader SQTaskFReader; -typedef struct SQTaskFWriter SQTaskFWriter; - #define TD_FILE_HEAD_SIZE 512 typedef struct STFInfo STFInfo; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index b90254e543..1a7e0f9a45 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -186,6 +186,7 @@ int32_t smaSyncPostCommit(SSma* pSma); int32_t smaAsyncPreCommit(SSma* pSma); int32_t smaAsyncCommit(SSma* pSma); int32_t smaAsyncPostCommit(SSma* pSma); +int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); @@ -354,16 +355,16 @@ struct SSma { void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); enum { - SNAP_DATA_META = 0, - SNAP_DATA_TSDB = 1, - SNAP_DATA_DEL = 2, - SNAP_DATA_RSMA1 = 3, - SNAP_DATA_RSMA2 = 4, - SNAP_DATA_QTASK = 5, - SNAP_DATA_TQ_HANDLE = 6, - SNAP_DATA_TQ_OFFSET = 7, - SNAP_DATA_STREAM_TASK = 8, - SNAP_DATA_STREAM_STATE = 9, + SNAP_DATA_META = 1, + SNAP_DATA_TSDB = 2, + SNAP_DATA_DEL = 3, + SNAP_DATA_RSMA1 = 4, + SNAP_DATA_RSMA2 = 5, + SNAP_DATA_QTASK = 6, + SNAP_DATA_TQ_HANDLE = 7, + SNAP_DATA_TQ_OFFSET = 8, + SNAP_DATA_STREAM_TASK = 9, + SNAP_DATA_STREAM_STATE = 10, }; struct SSnapDataHdr { diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index a0e3932245..3c88b6f5cb 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -241,6 +241,41 @@ static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { return TSDB_CODE_SUCCESS; } +// SQTaskFile ====================================================== +// int32_t tCmprQTaskFile(void const *lhs, void const *rhs) { +// int64_t *lCommitted = *(int64_t *)lhs; +// SQTaskFile *rQTaskF = (SQTaskFile *)rhs; + +// if (lCommitted < rQTaskF->commitID) { +// return -1; +// } else if (lCommitted > rQTaskF->commitID) { +// return 1; +// } + +// return 0; +// } + +#if 0 +/** + * @brief At most time, there is only one qtaskinfo file committed latest in aTaskFile. Sometimes, there would be + * multiple qtaskinfo files supporting snapshot replication. + * + * @param pSma + * @param pRSmaStat + * @return int32_t + */ +static int32_t tdCleanupQTaskInfoFiles(SSma *pSma, SRSmaStat *pRSmaStat) { + SVnode *pVnode = pSma->pVnode; + int64_t committed = pRSmaStat->commitAppliedVer; + SArray *aTaskFile = pRSmaStat->aTaskFile; + + void *qTaskFile = taosArraySearch(aTaskFile, committed, tCmprQTaskFile, TD_LE); + + + return TSDB_CODE_SUCCESS; +} +#endif + /** * @brief post-commit for rollup sma * 1) clean up the outdated qtaskinfo files diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 3505711cd0..02e488d65a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -38,7 +38,6 @@ static SRSmaInfo *tdGetRSmaInfoBySuid(SSma *pSma, int64_t suid); static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSchema, int64_t suid, SRSmaStat *pStat, int8_t blkType); static void tdRSmaFetchTrigger(void *param, void *tmrId); -static void tdRSmaQTaskInfoGetFName(int32_t vid, int64_t version, char *outputName); static int32_t tdRSmaQTaskInfoIterInit(SRSmaQTaskInfoIter *pIter, STFile *pTFile); static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isFinish); @@ -77,10 +76,14 @@ struct SRSmaQTaskInfoIter { int32_t nBufPos; }; -static void tdRSmaQTaskInfoGetFName(int32_t vgId, int64_t version, char *outputName) { +void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t version, char *outputName) { tdGetVndFileName(vgId, NULL, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); } +void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t version, const char* path, char *outputName) { + tdGetVndFileName(vgId, path, VNODE_RSMA_DIR, TD_QTASKINFO_FNAME_PREFIX, version, outputName); +} + static FORCE_INLINE int32_t tdRSmaQTaskInfoContLen(int32_t lenWithHead) { return lenWithHead - RSMA_QTASKINFO_HEAD_LEN; } @@ -599,8 +602,8 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche SSubmitReq *pReq = NULL; // TODO: the schema update should be handled if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) { - smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma), - suid, pItem->level, terrstr()); + smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, pItem->level, terrstr()); goto _err; } @@ -874,7 +877,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { STFile tFile = {0}; char qTaskInfoFName[TSDB_FILENAME_LEN] = {0}; - tdRSmaQTaskInfoGetFName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName); + tdRSmaQTaskInfoGetFileName(TD_VID(pVnode), pVnode->state.committed, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { goto _err; } @@ -1172,7 +1175,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { #if 0 if (pRSmaStat->commitAppliedVer > 0) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); + tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; @@ -1217,7 +1220,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { if (!isFileCreated) { char qTaskInfoFName[TSDB_FILENAME_LEN]; - tdRSmaQTaskInfoGetFName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); + tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName); if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; @@ -1357,3 +1360,20 @@ static void tdRSmaFetchTrigger(void *param, void *tmrId) { _end: tdReleaseSmaRef(smaMgmt.rsetId, pItem->refId, __func__, __LINE__); } + +int32_t smaDoRetention(SSma *pSma, int64_t now) { + int32_t code = TSDB_CODE_SUCCESS; + if (VND_IS_RSMA(pSma->pVnode)) { + return code; + } + + for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { + if (pSma->pRSmaTsdb[i]) { + code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); + if (code) goto _end; + } + } + +_end: + return code; +} \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 06fe0074f3..4dda122bd7 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -35,6 +35,7 @@ struct SRsmaSnapReader { int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) { int32_t code = 0; + SVnode* pVnode = pSma->pVnode; SRsmaSnapReader* pReader = NULL; // alloc @@ -47,6 +48,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead pReader->sver = sver; pReader->ever = ever; + // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, i == 0 ? SNAP_DATA_RSMA1 : SNAP_DATA_RSMA2, @@ -56,23 +58,98 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead } } } + + // qtaskinfo + // 1. add ref to qtaskinfo.v${ever} if exists and then start to replicate + char qTaskInfoFullName[TSDB_FILENAME_LEN]; + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), ever, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); + + if (!taosCheckExistFile(qTaskInfoFullName)) { + smaInfo("vgId:%d, vnode snapshot rsma reader for qtaskinfo not need as %s not exists", TD_VID(pVnode), + qTaskInfoFullName); + } else { + pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); + if (!pReader->pQTaskFReader) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + TdFilePtr qTaskF = taosOpenFile(qTaskInfoFullName, TD_FILE_READ); + if (!qTaskF) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + pReader->pQTaskFReader->pReadH = qTaskF; +#if 0 + SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask; + pQTaskF->nRef = 1; +#endif + } + *ppReader = pReader; - smaInfo("vgId:%d, vnode snapshot rsma reader opened succeed", SMA_VID(pSma)); + smaInfo("vgId:%d, vnode snapshot rsma reader opened %s succeed", TD_VID(pVnode), qTaskInfoFullName); return TSDB_CODE_SUCCESS; _err: - smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code)); + smaError("vgId:%d, vnode snapshot rsma reader opened failed since %s", TD_VID(pVnode), tstrerror(code)); return TSDB_CODE_FAILED; } -static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) { - int32_t code = 0; - SSma* pSma = pReader->pSma; +static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppBuf) { + int32_t code = 0; + SSma* pSma = pReader->pSma; + int64_t n = 0; + uint8_t* pBuf = NULL; + SQTaskFReader* qReader = pReader->pQTaskFReader; + + if (!qReader->pReadH) { + *ppBuf = NULL; + smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, readh is empty", SMA_VID(pSma)); + return 0; + } + + int64_t size = 0; + if (taosFStatFile(qReader->pReadH, &size, NULL) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // seek + if (taosLSeekFile(qReader->pReadH, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + ASSERT(!(*ppBuf)); + // alloc + *ppBuf = taosMemoryCalloc(1, sizeof(SSnapDataHdr) + size); + if (!(*ppBuf)) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + // read + n = taosReadFile(qReader->pReadH, POINTER_SHIFT(*ppBuf, sizeof(SSnapDataHdr)), size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } else if (n != size) { + code = TSDB_CODE_FILE_CORRUPTED; + goto _err; + } + + smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo, size:%" PRIi64, SMA_VID(pSma), size); + + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppBuf); + pHdr->type = SNAP_DATA_QTASK; + pHdr->size = size; _exit: smaInfo("vgId:%d, vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma)); return code; _err: + *ppBuf = NULL; smaError("vgId:%d, vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code)); return code; } @@ -108,14 +185,14 @@ int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) { // read qtaskinfo file if (!pReader->qTaskDone) { + smaInfo("vgId:%d, vnode snapshot rsma qtaskinfo not done", SMA_VID(pReader->pSma)); code = rsmaSnapReadQTaskInfo(pReader, ppData); if (code) { goto _err; } else { + pReader->qTaskDone = 1; if (*ppData) { goto _exit; - } else { - pReader->qTaskDone = 1; } } } @@ -140,11 +217,11 @@ int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) { } if (pReader->pQTaskFReader) { - // TODO: close for qtaskinfo + taosCloseFile(&pReader->pQTaskFReader->pReadH); + taosMemoryFreeClear(pReader->pQTaskFReader); smaInfo("vgId:%d, vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma)); } - smaInfo("vgId:%d, vnode snapshot rsma reader closed", SMA_VID(pReader->pSma)); taosMemoryFreeClear(*ppReader); @@ -171,6 +248,7 @@ struct SRsmaSnapWriter { int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) { int32_t code = 0; SRsmaSnapWriter* pWriter = NULL; + SVnode* pVnode = pSma->pVnode; // alloc pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter)); @@ -182,6 +260,7 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit pWriter->sver = sver; pWriter->ever = ever; + // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]); @@ -192,8 +271,25 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWrit } // qtaskinfo - // TODO + SQTaskFWriter* qWriter = (SQTaskFWriter*)taosMemoryCalloc(1, sizeof(SQTaskFWriter)); + qWriter->pSma = pSma; + char qTaskInfoFullName[TSDB_FILENAME_LEN]; + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 1, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); + TdFilePtr qTaskF = taosCreateFile(qTaskInfoFullName, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC); + if (!qTaskF) { + code = TAOS_SYSTEM_ERROR(errno); + smaError("vgId:%d, rsma snapshot writer open %s failed since %s", TD_VID(pSma->pVnode), qTaskInfoFullName, tstrerror(code)); + goto _err; + } + qWriter->pWriteH = qTaskF; + int32_t fnameLen = strlen(qTaskInfoFullName) + 1; + qWriter->fname = taosMemoryCalloc(1, fnameLen); + strncpy(qWriter->fname, qTaskInfoFullName, fnameLen); + pWriter->pQTaskFWriter = qWriter; + smaDebug("vgId:%d, rsma snapshot writer open succeed for %s", TD_VID(pSma->pVnode), qTaskInfoFullName); + + // snapWriter *ppWriter = pWriter; smaInfo("vgId:%d, rsma snapshot writer open succeed", TD_VID(pSma->pVnode)); @@ -208,18 +304,30 @@ _err: int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; SRsmaSnapWriter* pWriter = *ppWriter; + SVnode* pVnode = pWriter->pSma->pVnode; if (rollback) { - ASSERT(0); - // code = tsdbFSRollback(pWriter->pTsdb->pFS); - // if (code) goto _err; + // TODO: rsma1/rsma2 + // qtaskinfo + if(pWriter->pQTaskFWriter) { + taosRemoveFile(pWriter->pQTaskFWriter->fname); + } } else { + // rsma1/rsma2 for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pWriter->pDataWriter[i]) { code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback); if (code) goto _err; } } + // qtaskinfo + if (pWriter->pQTaskFWriter) { + char qTaskInfoFullName[TSDB_FILENAME_LEN]; + tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), 0, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFullName); + taosRenameFile(pWriter->pQTaskFWriter->fname, qTaskInfoFullName); + smaInfo("vgId:%d, vnode snapshot rsma writer rename %s to %s", SMA_VID(pWriter->pSma), + pWriter->pQTaskFWriter->fname, qTaskInfoFullName); + } } smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma)); @@ -261,26 +369,23 @@ _err: } static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; + int32_t code = 0; + SQTaskFWriter* qWriter = pWriter->pQTaskFWriter; - if (pWriter->pQTaskFWriter == NULL) { - // SDelFile* pDelFile = pWriter->fs.pDelFile; - - // // reader - // if (pDelFile) { - // code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL); - // if (code) goto _err; - - // code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL); - // if (code) goto _err; - // } - - // // writer - // SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; - // code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); - // if (code) goto _err; + if (qWriter && qWriter->pWriteH) { + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + int64_t size = pHdr->size; + ASSERT(size == (nData - sizeof(SSnapDataHdr))); + int64_t contLen = taosWriteFile(qWriter->pWriteH, pHdr->data, size); + if (contLen != size) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + } else { + smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo is not needed", SMA_VID(pWriter->pSma)); } - smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma)); + + smaInfo("vgId:%d, vnode snapshot rsma write qtaskinfo %s succeed", SMA_VID(pWriter->pSma), qWriter->fname); _exit: return code; diff --git a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c index cacb82b702..292c234f49 100644 --- a/source/dnode/vnode/src/tq/tqOffsetSnapshot.c +++ b/source/dnode/vnode/src/tq/tqOffsetSnapshot.c @@ -37,7 +37,7 @@ int32_t tqOffsetReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetReader pReader->sver = sver; pReader->ever = ever; - tqInfo("vgId:%d vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode)); + tqInfo("vgId:%d, vnode snapshot tq offset reader opened", TD_VID(pTq->pVnode)); *ppReader = pReader; return 0; @@ -109,7 +109,7 @@ int32_t tqOffsetWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqOffsetWriter return code; _err: - tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppWriter = NULL; return code; } diff --git a/source/dnode/vnode/src/tq/tqSnapshot.c b/source/dnode/vnode/src/tq/tqSnapshot.c index 21172134ba..b4a7ce7737 100644 --- a/source/dnode/vnode/src/tq/tqSnapshot.c +++ b/source/dnode/vnode/src/tq/tqSnapshot.c @@ -52,13 +52,13 @@ int32_t tqSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapReader** p goto _err; } - tqInfo("vgId:%d vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); + tqInfo("vgId:%d, vnode snapshot tq reader opened", TD_VID(pTq->pVnode)); *ppReader = pReader; return code; _err: - tqError("vgId:%d vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode snapshot tq reader open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppReader = NULL; return code; } @@ -113,14 +113,14 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { pHdr->size = vLen; memcpy(pHdr->data, pVal, vLen); - tqInfo("vgId:%d vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), + tqInfo("vgId:%d, vnode snapshot tq read data, version:%" PRId64 " subKey: %s vLen:%d", TD_VID(pReader->pTq->pVnode), handle.snapshotVer, handle.subKey, vLen); _exit: return code; _err: - tqError("vgId:%d vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode snapshot tq read data failed since %s", TD_VID(pReader->pTq->pVnode), tstrerror(code)); return code; } @@ -154,7 +154,7 @@ int32_t tqSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, STqSnapWriter** p return code; _err: - tqError("vgId:%d tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, tq snapshot writer open failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); *ppWriter = NULL; return code; } @@ -182,7 +182,7 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { return code; _err: - tqError("vgId:%d tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, tq snapshot writer close failed since %s", TD_VID(pWriter->pTq->pVnode), tstrerror(code)); return code; } @@ -204,6 +204,6 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { _err: tDecoderClear(pDecoder); - tqError("vgId:%d vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); + tqError("vgId:%d, vnode snapshot tq write failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 97ab410c1b..a9f07cbf24 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -1215,11 +1215,11 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) } _exit: - tsdbDebug("vgId:%d, tsdb snapshow write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); + tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); return code; _err: - tsdbError("vgId:%d, tsdb snapshow write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path, + tsdbError("vgId:%d, tsdb snapshot write for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path, tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index d1b1b68ce4..5a81f91919 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -194,7 +194,7 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData) if (*ppData) { goto _exit; } else { - pReader->tsdbDone = 1; + pReader->rsmaDone = 1; code = rsmaSnapReaderClose(&pReader->pRsmaReader); if (code) goto _err; } @@ -373,18 +373,9 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { case SNAP_DATA_STREAM_STATE: { } break; case SNAP_DATA_RSMA1: - case SNAP_DATA_RSMA2: { - // rsma1/rsma2 - if (pWriter->pRsmaSnapWriter == NULL) { - code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter); - if (code) goto _err; - } - - code = rsmaSnapWrite(pWriter->pRsmaSnapWriter, pData, nData); - if (code) goto _err; - } break; + case SNAP_DATA_RSMA2: case SNAP_DATA_QTASK: { - // qtask for rsma + // rsma1/rsma2/qtask for rsma if (pWriter->pRsmaSnapWriter == NULL) { code = rsmaSnapWriterOpen(pVnode->pSma, pWriter->sver, pWriter->ever, &pWriter->pRsmaSnapWriter); if (code) goto _err; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 8aa3a7d296..16bcb967cd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -378,6 +378,9 @@ static int32_t vnodeProcessTrimReq(SVnode *pVnode, int64_t version, void *pReq, code = tsdbDoRetention(pVnode->pTsdb, trimReq.timestamp); if (code) goto _exit; + code = smaDoRetention(pVnode->pSma, trimReq.timestamp); + if (code) goto _exit; + _exit: return code; } @@ -908,7 +911,7 @@ _exit: static int32_t vnodeProcessCreateTSmaReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) { SVCreateTSmaReq req = {0}; - SDecoder coder; + SDecoder coder = {0}; if (pRsp) { pRsp->msgType = TDMT_VND_CREATE_SMA_RSP; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 556fd78360..b76769bdb8 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -162,6 +162,26 @@ _err: #endif } +TdFilePtr taosCreateFile(const char *path, int32_t tdFileOptions) { + TdFilePtr fp = taosOpenFile(path, tdFileOptions); + if (!fp) { + if (errno == ENOENT) { + // Try to create directory recursively + char *s = strdup(path); + if (taosMulMkDir(taosDirName(s)) != 0) { + taosMemoryFree(s); + return NULL; + } + taosMemoryFree(s); + fp = taosOpenFile(path, tdFileOptions); + if (!fp) { + return NULL; + } + } + } + return fp; +} + int32_t taosRemoveFile(const char *path) { return remove(path); } int32_t taosRenameFile(const char *oldName, const char *newName) {