enh: qinf process optimization for duplicated commit version
This commit is contained in:
parent
da8244f178
commit
b15fa09acd
|
@ -211,6 +211,7 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version);
|
||||||
void tdRSmaFSClose(SRSmaFS *fs);
|
void tdRSmaFSClose(SRSmaFS *fs);
|
||||||
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
||||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version);
|
||||||
|
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat);
|
||||||
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
|
int32_t tdRSmaFSUpsertQTaskFile(SRSmaFS *pFS, SQTaskFile *qTaskFile);
|
||||||
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
|
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer);
|
||||||
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
|
||||||
|
|
|
@ -182,6 +182,7 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SRSmaFS *pFS = RSMA_FS(pStat);
|
SRSmaFS *pFS = RSMA_FS(pStat);
|
||||||
int64_t committed = pStat->commitAppliedVer;
|
int64_t committed = pStat->commitAppliedVer;
|
||||||
|
int64_t fsMaxVer = -1;
|
||||||
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
char qTaskInfoFullName[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
@ -204,10 +205,20 @@ static int32_t tdUpdateQTaskInfoFiles(SSma *pSma, SRSmaStat *pStat) {
|
||||||
++i;
|
++i;
|
||||||
}
|
}
|
||||||
|
|
||||||
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
|
if (taosArrayGetSize(pFS->aQTaskInf) > 0) {
|
||||||
if (tdRSmaFSUpsertQTaskFile(pFS, &qFile) < 0) {
|
fsMaxVer = ((SQTaskFile *)taosArrayGetLast(pFS->aQTaskInf))->version;
|
||||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
}
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
|
if (fsMaxVer < committed) {
|
||||||
|
SQTaskFile qFile = {.nRef = 1, .padding = 0, .version = committed, .size = 0};
|
||||||
|
if (taosArrayPush(pFS->aQTaskInf, &qFile) < 0) {
|
||||||
|
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
smaDebug("vgId:%d, update qinf, no need as committed %" PRIi64 " not larger than fsMaxVer %" PRIi64, TD_VID(pVnode),
|
||||||
|
committed, fsMaxVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
taosWUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
@ -365,7 +376,7 @@ static int32_t tdProcessRSmaAsyncPostCommitImpl(SSma *pSma) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pEnv);
|
||||||
|
|
||||||
// step 1: merge qTaskInfo and iQTaskInfo
|
// step 1: merge qTaskInfo and iQTaskInfo
|
||||||
// lock
|
// lock
|
||||||
|
|
|
@ -96,6 +96,18 @@ int32_t tdRSmaFSRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||||
return oldVal;
|
return oldVal;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t tdRSmaFSMaxVer(SSma *pSma, SRSmaStat *pStat) {
|
||||||
|
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
||||||
|
int64_t version = -1;
|
||||||
|
|
||||||
|
taosRLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
if (taosArrayGetSize(aQTaskInf) > 0) {
|
||||||
|
version = ((SQTaskFile *)taosArrayGetLast(aQTaskInf))->version;
|
||||||
|
}
|
||||||
|
taosRUnLockLatch(RSMA_FS_LOCK(pStat));
|
||||||
|
return version;
|
||||||
|
}
|
||||||
|
|
||||||
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
void tdRSmaFSUnRef(SSma *pSma, SRSmaStat *pStat, int64_t version) {
|
||||||
SVnode *pVnode = pSma->pVnode;
|
SVnode *pVnode = pSma->pVnode;
|
||||||
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
SArray *aQTaskInf = RSMA_FS(pStat)->aQTaskInf;
|
||||||
|
|
|
@ -1453,17 +1453,24 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t fsMaxVer = tdRSmaFSMaxVer(pSma, pRSmaStat);
|
||||||
|
if (pRSmaStat->commitAppliedVer <= fsMaxVer) {
|
||||||
|
smaDebug("vgId:%d, rsma persist, no need as applied %" PRIi64 " not larger than fsMaxVer %" PRIi64, vid,
|
||||||
|
pRSmaStat->commitAppliedVer, fsMaxVer);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
STFile tFile = {0};
|
STFile tFile = {0};
|
||||||
#if 0
|
#if 0
|
||||||
if (pRSmaStat->commitAppliedVer > 0) {
|
if (pRSmaStat->commitAppliedVer > 0) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile));
|
||||||
|
@ -1513,11 +1520,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
char qTaskInfoFName[TSDB_FILENAME_LEN];
|
||||||
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
tdRSmaQTaskInfoGetFileName(vid, pRSmaStat->commitAppliedVer, qTaskInfoFName);
|
||||||
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
smaError("vgId:%d, rsma persist, init %s failed since %s", vid, qTaskInfoFName, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) {
|
||||||
smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
smaError("vgId:%d, rsma persist, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid,
|
||||||
|
@ -1561,7 +1568,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
_err:
|
_err:
|
||||||
smaError("vgId:%d, rsma persit failed since %s", vid, terrstr());
|
smaError("vgId:%d, rsma persist failed since %s", vid, terrstr());
|
||||||
if (isFileCreated) {
|
if (isFileCreated) {
|
||||||
tdRemoveTFile(&tFile);
|
tdRemoveTFile(&tFile);
|
||||||
tdDestroyTFile(&tFile);
|
tdDestroyTFile(&tFile);
|
||||||
|
|
Loading…
Reference in New Issue