diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index c7b9cb3c93..a5d9c0a91b 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -223,21 +223,21 @@ int32_t smaPreClose(SSma *pSma); // rsma void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); -int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); -void tdRSmaFSClose(SRSmaFS *fs); -int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); -int32_t tdRSmaFSCommit(SSma *pSma); -int32_t tdRSmaFSFinishCommit(SSma *pSma); -int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); -int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); -int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); -void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); -int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); -int32_t tdRSmaFSRollback(SSma *pSma); +// int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); +// void tdRSmaFSClose(SRSmaFS *fs); +// int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); +// int32_t tdRSmaFSCommit(SSma *pSma); +// int32_t tdRSmaFSFinishCommit(SSma *pSma); +// int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); +// int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); +// int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); +// void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); +// int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); +// int32_t tdRSmaFSRollback(SSma *pSma); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); -int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7c6c72e995..6597334c5e 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -318,7 +318,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData); // SRSmaSnapWriter ======================================== int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); -int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); +// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); typedef struct { diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index dde42d8e09..c74559bc5e 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -108,8 +108,8 @@ int32_t smaFinishCommit(SSma *pSma) { int32_t lino = 0; SVnode *pVnode = pSma->pVnode; - code = tdRSmaFSFinishCommit(pSma); - TSDB_CHECK_CODE(code, lino, _exit); + // code = tdRSmaFSFinishCommit(pSma); + // TSDB_CHECK_CODE(code, lino, _exit); if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { TSDB_CHECK_CODE(code, lino, _exit); @@ -187,10 +187,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); - code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); - TSDB_CHECK_CODE(code, lino, _exit); + // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + // TSDB_CHECK_CODE(code, lino, _exit); - smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); + // smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); #if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall @@ -246,8 +246,8 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) { goto _exit; } - code = tdRSmaFSCommit(pSma); - TSDB_CHECK_CODE(code, lino, _exit); + // code = tdRSmaFSCommit(pSma); + // TSDB_CHECK_CODE(code, lino, _exit); code = tsdbCommit(VND_RSMA1(pVnode), pInfo); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index e05667213e..b96d04e180 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -288,7 +288,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { taosHashCleanup(RSMA_INFO_HASH(pStat)); // step 5: - tdRSmaFSClose(RSMA_FS(pStat)); + // tdRSmaFSClose(RSMA_FS(pStat)); // step 6: free pStat tsem_destroy(&(pStat->notEmpty)); diff --git a/source/dnode/vnode/src/sma/smaFS.c b/source/dnode/vnode/src/sma/smaFS.c index 37a19529cb..d7400245f1 100644 --- a/source/dnode/vnode/src/sma/smaFS.c +++ b/source/dnode/vnode/src/sma/smaFS.c @@ -16,7 +16,7 @@ #include "sma.h" // ================================================================================================= - +#if 0 // static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); @@ -640,3 +640,4 @@ _exit: } return code; } +#endif \ No newline at end of file diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 42b13cb69c..7dafc73652 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1185,9 +1185,9 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, } // step 2: open SRSmaFS for qTaskFiles - if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) { - goto _err; - } + // if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) { + // goto _err; + // } // step 3: iterate all stables to restore the rsma env if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) { @@ -1205,7 +1205,7 @@ _err: return code; } - +#if 0 int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t code = 0; int32_t lino = 0; @@ -1315,7 +1315,7 @@ _exit: terrno = code; return code; } - +#endif /** * @brief trigger to get rsma result in async mode * diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index c00e96a066..0cde0de2a4 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -15,8 +15,8 @@ #include "sma.h" -static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); -static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); +// static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); +// static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); // SRSmaSnapReader ======================================== struct SRSmaSnapReader { @@ -63,10 +63,10 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead } // open qtaskinfo - taosRLockLatch(RSMA_FS_LOCK(pStat)); - code = tdRSmaFSRef(pSma, &pReader->fs); - taosRUnLockLatch(RSMA_FS_LOCK(pStat)); - TSDB_CHECK_CODE(code, lino, _exit); + // taosRLockLatch(RSMA_FS_LOCK(pStat)); + // code = tdRSmaFSRef(pSma, &pReader->fs); + // taosRUnLockLatch(RSMA_FS_LOCK(pStat)); + // TSDB_CHECK_CODE(code, lino, _exit); if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) { pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); @@ -249,7 +249,7 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) { int32_t code = 0; SRSmaSnapReader* pReader = *ppReader; - tdRSmaFSUnRef(pReader->pSma, &pReader->fs); + // tdRSmaFSUnRef(pReader->pSma, &pReader->fs); taosMemoryFreeClear(pReader->pQTaskFReader); for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { @@ -300,8 +300,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit } // qtaskinfo - code = tdRSmaFSCopy(pSma, &pWriter->fs); - TSDB_CHECK_CODE(code, lino, _exit); + // code = tdRSmaFSCopy(pSma, &pWriter->fs); + // TSDB_CHECK_CODE(code, lino, _exit); // snapWriter *ppWriter = pWriter; @@ -316,21 +316,21 @@ _exit: return code; } -int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { - int32_t code = 0; - int32_t lino = 0; +// int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { +// int32_t code = 0; +// int32_t lino = 0; - if (pWriter) { - code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); - TSDB_CHECK_CODE(code, lino, _exit); - } +// if (pWriter) { +// code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); +// TSDB_CHECK_CODE(code, lino, _exit); +// } -_exit: - if (code) { - smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); - } - return code; -} +// _exit: +// if (code) { +// smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); +// } +// return code; +// } int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; @@ -364,9 +364,10 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { } } +#if 0 // qtaskinfo if (rollback) { - tdRSmaFSRollback(pSma); + // tdRSmaFSRollback(pSma); // remove qTaskFiles } else { // sendFile from fname.Ver to fname @@ -418,7 +419,7 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { // unlock taosWUnLockLatch(RSMA_FS_LOCK(pStat)); } - +#endif // rsma restore code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback); TSDB_CHECK_CODE(code, lino, _exit); @@ -451,7 +452,7 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) pHdr->type = SNAP_DATA_TSDB; code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr); } else if (pHdr->type == SNAP_DATA_QTASK) { - code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); + // code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); } else { code = TSDB_CODE_RSMA_FS_SYNC; } @@ -466,7 +467,7 @@ _exit: } return code; } - +#if 0 static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; int32_t lino = 0; @@ -516,8 +517,8 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, taosCloseFile(&fp); - code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); - TSDB_CHECK_CODE(code, lino, _exit); + // code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); + // TSDB_CHECK_CODE(code, lino, _exit); _exit: if (code) { @@ -531,3 +532,4 @@ _exit: return code; } +#endif \ No newline at end of file diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index 7932cb68ac..75f75072d7 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect #todo xukaili sma should use rocksdb. -return 1 +#return 1 print =============== create database with retentions sql create database d0 retentions 5s:7d,10s:21d,15s:365d;