chore: refactor

This commit is contained in:
kailixu 2023-07-05 19:08:43 +08:00
parent 59e83fca5d
commit df7866e9f6
8 changed files with 59 additions and 56 deletions

View File

@ -223,21 +223,21 @@ int32_t smaPreClose(SSma *pSma);
// rsma // rsma
void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback); // int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback);
void tdRSmaFSClose(SRSmaFS *fs); // void tdRSmaFSClose(SRSmaFS *fs);
int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew); // int32_t tdRSmaFSPrepareCommit(SSma *pSma, SRSmaFS *pFSNew);
int32_t tdRSmaFSCommit(SSma *pSma); // int32_t tdRSmaFSCommit(SSma *pSma);
int32_t tdRSmaFSFinishCommit(SSma *pSma); // int32_t tdRSmaFSFinishCommit(SSma *pSma);
int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS); // int32_t tdRSmaFSCopy(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS); // int32_t tdRSmaFSTakeSnapshot(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS); // int32_t tdRSmaFSRef(SSma *pSma, SRSmaFS *pFS);
void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS); // void tdRSmaFSUnRef(SSma *pSma, SRSmaFS *pFS);
int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize); // int32_t tdRSmaFSUpsertQTaskFile(SSma *pSma, SRSmaFS *pFS, SQTaskFile *qTaskFile, int32_t nSize);
int32_t tdRSmaFSRollback(SSma *pSma); // int32_t tdRSmaFSRollback(SSma *pSma);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); // int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName); void tdRSmaQTaskInfoGetFileName(int32_t vgId, int64_t suid, int8_t level, int64_t version, char *outputName);
void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path, void tdRSmaQTaskInfoGetFullName(int32_t vgId, int64_t suid, int8_t level, int64_t version, const char *path,

View File

@ -318,7 +318,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData);
// SRSmaSnapWriter ======================================== // SRSmaSnapWriter ========================================
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter); int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWriter** ppWriter);
int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter); // int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter);
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback); int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback);
typedef struct { typedef struct {

View File

@ -108,8 +108,8 @@ int32_t smaFinishCommit(SSma *pSma) {
int32_t lino = 0; int32_t lino = 0;
SVnode *pVnode = pSma->pVnode; SVnode *pVnode = pSma->pVnode;
code = tdRSmaFSFinishCommit(pSma); // code = tdRSmaFSFinishCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { if (VND_RSMA1(pVnode) && (code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -187,10 +187,10 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit; if (!isCommit) goto _exit;
smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); smaInfo("vgId:%d, rsma commit, all items are consumed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); // smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());
#if 0 // consuming task of qTaskInfo clone #if 0 // consuming task of qTaskInfo clone
// step 4: swap queue/qall and iQueue/iQall // step 4: swap queue/qall and iQueue/iQall
@ -246,8 +246,8 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma, SCommitInfo *pInfo) {
goto _exit; goto _exit;
} }
code = tdRSmaFSCommit(pSma); // code = tdRSmaFSCommit(pSma);
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbCommit(VND_RSMA1(pVnode), pInfo); code = tsdbCommit(VND_RSMA1(pVnode), pInfo);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -288,7 +288,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
taosHashCleanup(RSMA_INFO_HASH(pStat)); taosHashCleanup(RSMA_INFO_HASH(pStat));
// step 5: // step 5:
tdRSmaFSClose(RSMA_FS(pStat)); // tdRSmaFSClose(RSMA_FS(pStat));
// step 6: free pStat // step 6: free pStat
tsem_destroy(&(pStat->notEmpty)); tsem_destroy(&(pStat->notEmpty));

View File

@ -16,7 +16,7 @@
#include "sma.h" #include "sma.h"
// ================================================================================================= // =================================================================================================
#if 0
// static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output); // static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output);
static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2); static int32_t tdQTaskInfCmprFn1(const void *p1, const void *p2);
@ -640,3 +640,4 @@ _exit:
} }
return code; return code;
} }
#endif

View File

@ -1185,9 +1185,9 @@ int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer,
} }
// step 2: open SRSmaFS for qTaskFiles // step 2: open SRSmaFS for qTaskFiles
if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) { // if ((code = tdRSmaFSOpen(pSma, qtaskFileVer, rollback)) < 0) {
goto _err; // goto _err;
} // }
// step 3: iterate all stables to restore the rsma env // step 3: iterate all stables to restore the rsma env
if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) { if ((code = tdRSmaRestoreQTaskInfoInit(pSma, &nTables)) < 0) {
@ -1205,7 +1205,7 @@ _err:
return code; return code;
} }
#if 0
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -1315,7 +1315,7 @@ _exit:
terrno = code; terrno = code;
return code; return code;
} }
#endif
/** /**
* @brief trigger to get rsma result in async mode * @brief trigger to get rsma result in async mode
* *

View File

@ -15,8 +15,8 @@
#include "sma.h" #include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData); // static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData); // static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SRSmaSnapReader ======================================== // SRSmaSnapReader ========================================
struct SRSmaSnapReader { struct SRSmaSnapReader {
@ -63,10 +63,10 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapRead
} }
// open qtaskinfo // open qtaskinfo
taosRLockLatch(RSMA_FS_LOCK(pStat)); // taosRLockLatch(RSMA_FS_LOCK(pStat));
code = tdRSmaFSRef(pSma, &pReader->fs); // code = tdRSmaFSRef(pSma, &pReader->fs);
taosRUnLockLatch(RSMA_FS_LOCK(pStat)); // taosRUnLockLatch(RSMA_FS_LOCK(pStat));
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) { if (taosArrayGetSize(pReader->fs.aQTaskInf) > 0) {
pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader)); pReader->pQTaskFReader = taosMemoryCalloc(1, sizeof(SQTaskFReader));
@ -249,7 +249,7 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
int32_t code = 0; int32_t code = 0;
SRSmaSnapReader* pReader = *ppReader; SRSmaSnapReader* pReader = *ppReader;
tdRSmaFSUnRef(pReader->pSma, &pReader->fs); // tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
taosMemoryFreeClear(pReader->pQTaskFReader); taosMemoryFreeClear(pReader->pQTaskFReader);
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
@ -300,8 +300,8 @@ int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRSmaSnapWrit
} }
// qtaskinfo // qtaskinfo
code = tdRSmaFSCopy(pSma, &pWriter->fs); // code = tdRSmaFSCopy(pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
// snapWriter // snapWriter
*ppWriter = pWriter; *ppWriter = pWriter;
@ -316,21 +316,21 @@ _exit:
return code; return code;
} }
int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) { // int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
int32_t code = 0; // int32_t code = 0;
int32_t lino = 0; // int32_t lino = 0;
if (pWriter) { // if (pWriter) {
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs); // code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
} // }
_exit: // _exit:
if (code) { // if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code)); // smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pWriter->pSma), __func__, lino, tstrerror(code));
} // }
return code; // return code;
} // }
int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) { int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0; int32_t code = 0;
@ -364,9 +364,10 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
} }
} }
#if 0
// qtaskinfo // qtaskinfo
if (rollback) { if (rollback) {
tdRSmaFSRollback(pSma); // tdRSmaFSRollback(pSma);
// remove qTaskFiles // remove qTaskFiles
} else { } else {
// sendFile from fname.Ver to fname // sendFile from fname.Ver to fname
@ -418,7 +419,7 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
// unlock // unlock
taosWUnLockLatch(RSMA_FS_LOCK(pStat)); taosWUnLockLatch(RSMA_FS_LOCK(pStat));
} }
#endif
// rsma restore // rsma restore
code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback); code = tdRSmaRestore(pWriter->pSma, RSMA_RESTORE_SYNC, pWriter->ever, rollback);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -451,7 +452,7 @@ int32_t rsmaSnapWrite(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData)
pHdr->type = SNAP_DATA_TSDB; pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr); code = tsdbSnapWrite(pWriter->pDataWriter[1], pHdr);
} else if (pHdr->type == SNAP_DATA_QTASK) { } else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData); // code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
} else { } else {
code = TSDB_CODE_RSMA_FS_SYNC; code = TSDB_CODE_RSMA_FS_SYNC;
} }
@ -466,7 +467,7 @@ _exit:
} }
return code; return code;
} }
#if 0
static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -516,8 +517,8 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData,
taosCloseFile(&fp); taosCloseFile(&fp);
code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1); // code = tdRSmaFSUpsertQTaskFile(pSma, &pWriter->fs, &qTaskFile, 1);
TSDB_CHECK_CODE(code, lino, _exit); // TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
@ -531,3 +532,4 @@ _exit:
return code; return code;
} }
#endif

View File

@ -5,7 +5,7 @@ sleep 50
sql connect sql connect
#todo xukaili sma should use rocksdb. #todo xukaili sma should use rocksdb.
return 1 #return 1
print =============== create database with retentions print =============== create database with retentions
sql create database d0 retentions 5s:7d,10s:21d,15s:365d; sql create database d0 retentions 5s:7d,10s:21d,15s:365d;