refactor: rsma code optimization
This commit is contained in:
parent
df417c7f50
commit
55a1c02f73
|
@ -40,6 +40,10 @@ static int32_t tdRSmaQTaskInfoIterNextBlock(SRSmaQTaskInfoIter *pIter, bool *isF
|
||||||
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
|
static int32_t tdRSmaQTaskInfoRestore(SSma *pSma, SRSmaQTaskInfoIter *pIter);
|
||||||
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
|
static int32_t tdRSmaQTaskInfoItemRestore(SSma *pSma, const SRSmaQTaskInfoItem *infoItem);
|
||||||
|
|
||||||
|
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma);
|
||||||
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma);
|
||||||
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma);
|
||||||
|
|
||||||
struct SRSmaInfoItem {
|
struct SRSmaInfoItem {
|
||||||
SRSmaInfo *pRsmaInfo;
|
SRSmaInfo *pRsmaInfo;
|
||||||
void *taskInfo; // qTaskInfo_t
|
void *taskInfo; // qTaskInfo_t
|
||||||
|
@ -696,10 +700,9 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
|
static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
// step 1: iterate all stables to restore the rsma env
|
|
||||||
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
|
SArray *suidList = taosArrayInit(1, sizeof(tb_uid_t));
|
||||||
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
|
if (tsdbGetStbIdList(SMA_META(pSma), 0, suidList) < 0) {
|
||||||
taosArrayDestroy(suidList);
|
taosArrayDestroy(suidList);
|
||||||
|
@ -741,7 +744,20 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
|
metaReaderClear(&mr);
|
||||||
|
taosArrayDestroy(suidList);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
metaReaderClear(&mr);
|
||||||
|
taosArrayDestroy(suidList);
|
||||||
|
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
|
@ -749,34 +765,63 @@ int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
|
||||||
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, pVnode->pTfs, qTaskInfoFName) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
|
if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) {
|
||||||
metaReaderClear(&mr);
|
|
||||||
taosArrayDestroy(suidList);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
|
if (tdOpenTFile(&tFile, TD_FILE_READ) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaQTaskInfoIter fIter = {0};
|
SRSmaQTaskInfoIter fIter = {0};
|
||||||
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
SRSmaQTaskInfoItem infoItem = {0};
|
|
||||||
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
if (tdRSmaQTaskInfoRestore(pSma, &fIter) < 0) {
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_err:
|
||||||
tdRSmaQTaskInfoIterDestroy(&fIter);
|
tdRSmaQTaskInfoIterDestroy(&fIter);
|
||||||
metaReaderClear(&mr);
|
smaError("failed to restore rsma task since %s", terrstr());
|
||||||
taosArrayDestroy(suidList);
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief reload ts data from checkpoint
|
||||||
|
*
|
||||||
|
* @param pSma
|
||||||
|
* @return int32_t
|
||||||
|
*/
|
||||||
|
static int32_t tdRSmaRestoreTSDataReload(SSma *pSma) {
|
||||||
|
// TODO
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
_err:
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tdProcessRSmaRestoreImpl(SSma *pSma) {
|
||||||
|
SVnode *pVnode = pSma->pVnode;
|
||||||
|
|
||||||
|
// step 1: iterate all stables to restore the rsma env
|
||||||
|
if (tdRSmaRestoreQTaskInfoInit(pSma) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// step 2: retrieve qtaskinfo items from the persistence file(rsma/qtaskinfo) and restore
|
||||||
|
if (tdRSmaRestoreQTaskInfoReload(pSma) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
// step 3: reload ts data from checkpoint
|
||||||
|
if (tdRSmaRestoreTSDataReload(pSma) < 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
metaReaderClear(&mr);
|
|
||||||
taosArrayDestroy(suidList);
|
|
||||||
smaError("failed to restore rsma task since %s", terrstr());
|
smaError("failed to restore rsma task since %s", terrstr());
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue