chore: rsma snapshot
This commit is contained in:
parent
dfa7e0f59a
commit
d983dd89d1
|
@ -320,106 +320,9 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Fetch qtaskfiles LE than version
|
||||
*
|
||||
* @param pSma
|
||||
* @param version
|
||||
* @param output
|
||||
* @return int32_t
|
||||
*/
|
||||
#if 0
|
||||
static int32_t tdFetchQTaskInfoFiles(SSma *pSma, int64_t version, SArray **output) {
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
TdDirPtr pDir = NULL;
|
||||
TdDirEntryPtr pDirEntry = NULL;
|
||||
char dir[TSDB_FILENAME_LEN];
|
||||
const char *pattern = "v[0-9]+qinf\\.v([0-9]+)?$";
|
||||
regex_t regex;
|
||||
int code = 0;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
||||
tdRSmaGetDirName(TD_VID(pVnode), tfsGetPrimaryPath(pVnode->pTfs), VNODE_RSMA_DIR, true, dir);
|
||||
|
||||
if (!taosCheckExistFile(dir)) {
|
||||
smaDebug("vgId:%d, fetch qtask files, no need as dir %s not exist", TD_VID(pVnode), dir);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Resource allocation and init
|
||||
if ((code = regcomp(®ex, pattern, REG_EXTENDED)) != 0) {
|
||||
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
|
||||
char errbuf[128];
|
||||
regerror(code, ®ex, errbuf, sizeof(errbuf));
|
||||
smaWarn("vgId:%d, fetch qtask files, regcomp for %s failed since %s", TD_VID(pVnode), dir, errbuf);
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (!(pDir = taosOpenDir(dir))) {
|
||||
regfree(®ex);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
smaError("vgId:%d, fetch qtask files, open dir %s failed since %s", TD_VID(pVnode), dir, terrstr());
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
int32_t dirLen = strlen(dir);
|
||||
char *dirEnd = POINTER_SHIFT(dir, dirLen);
|
||||
regmatch_t regMatch[2];
|
||||
while ((pDirEntry = taosReadDir(pDir))) {
|
||||
char *entryName = taosGetDirEntryName(pDirEntry);
|
||||
if (!entryName) {
|
||||
continue;
|
||||
}
|
||||
|
||||
code = regexec(®ex, entryName, 2, regMatch, 0);
|
||||
|
||||
if (code == 0) {
|
||||
// match
|
||||
smaInfo("vgId:%d, fetch qtask files, max ver:%" PRIi64 ", %s found", TD_VID(pVnode), version, entryName);
|
||||
|
||||
int64_t ver = -1;
|
||||
sscanf((const char *)POINTER_SHIFT(entryName, regMatch[1].rm_so), "%" PRIi64, &ver);
|
||||
if ((ver <= version) && (ver > -1)) {
|
||||
if (!(*output)) {
|
||||
if (!(*output = taosArrayInit(1, POINTER_BYTES))) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
char *entryDup = strdup(entryName);
|
||||
if (!entryDup) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
if (!taosArrayPush(*output, &entryDup)) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
} else {
|
||||
}
|
||||
} else if (code == REG_NOMATCH) {
|
||||
// not match
|
||||
smaTrace("vgId:%d, fetch qtask files, not match %s", TD_VID(pVnode), entryName);
|
||||
continue;
|
||||
} else {
|
||||
// has other error
|
||||
char errbuf[128];
|
||||
regerror(code, ®ex, errbuf, sizeof(errbuf));
|
||||
smaWarn("vgId:%d, fetch qtask files, regexec failed since %s", TD_VID(pVnode), errbuf);
|
||||
terrno = TSDB_CODE_RSMA_REGEX_MATCH;
|
||||
goto _end;
|
||||
}
|
||||
}
|
||||
_end:
|
||||
taosCloseDir(&pDir);
|
||||
regfree(®ex);
|
||||
return terrno == 0 ? TSDB_CODE_SUCCESS : TSDB_CODE_FAILED;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t tdRSmaFSScanAndTryFix(SSma *pSma) {
|
||||
int32_t code = 0;
|
||||
#if 0
|
||||
int32_t lino = 0;
|
||||
SVnode *pVnode = pSma->pVnode;
|
||||
SSmaEnv *pEnv = SMA_RSMA_ENV(pSma);
|
||||
|
@ -462,6 +365,7 @@ _exit:
|
|||
if (code) {
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
#endif
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -497,10 +401,9 @@ int32_t tdRSmaFSOpen(SSma *pSma, int64_t version, int8_t rollback) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
// 1st open with empty current/qTaskInfoFile
|
||||
// 1st time open with empty current/qTaskInfoFile
|
||||
code = tdRSmaSaveFSToFile(RSMA_FS(pStat), current);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
ASSERT(!rollback);
|
||||
}
|
||||
|
||||
// scan and try fix(remove main.db/main.db.xxx and use the one with version)
|
||||
|
|
|
@ -101,18 +101,18 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
|
|||
|
||||
if (!qReader) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, qTaskReader is NULL", TD_VID(pVnode));
|
||||
return 0;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since qTaskReader is NULL", TD_VID(pVnode));
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
if (pReader->fsIter >= taosArrayGetSize(pFS->aQTaskInf)) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, fsIter reach end", TD_VID(pVnode));
|
||||
return 0;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
while (pReader->fsIter < taosArrayGetSize(pFS->aQTaskInf)) {
|
||||
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter);
|
||||
SQTaskFile* qTaskF = taosArrayGet(pFS->aQTaskInf, pReader->fsIter++);
|
||||
if (qTaskF->version != version) {
|
||||
continue;
|
||||
}
|
||||
|
@ -120,10 +120,11 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
|
|||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, tfsGetPrimaryPath(pVnode->pTfs),
|
||||
fname);
|
||||
if (!taosCheckExistFile(fname)) {
|
||||
smaWarn("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8 ", version %" PRIi64
|
||||
" is not needed as %s not exist",
|
||||
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
|
||||
continue;
|
||||
smaError("vgId:%d, vnode snapshot rsma reader for qtaskinfo, table %" PRIi64 ", level %" PRIi8
|
||||
", version %" PRIi64 " failed since %s not exist",
|
||||
TD_VID(pVnode), qTaskF->suid, qTaskF->level, version, fname);
|
||||
code = TSDB_CODE_RSMA_FS_SYNC;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
|
||||
|
@ -139,7 +140,7 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
|
|||
if (!qReader->pReadH) {
|
||||
*ppBuf = NULL;
|
||||
smaInfo("vgId:%d, vnode snapshot rsma reader qtaskinfo, not needed since readh is NULL", TD_VID(pVnode));
|
||||
return 0;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
|
@ -184,6 +185,8 @@ static int32_t rsmaSnapReadQTaskInfo(SRSmaSnapReader* pReader, uint8_t** ppBuf)
|
|||
pHdr->size = size;
|
||||
|
||||
_exit:
|
||||
if (qReader) taosCloseFile(&qReader->pReadH);
|
||||
|
||||
if (code) {
|
||||
*ppBuf = NULL;
|
||||
smaError("vgId:%d, %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
|
||||
|
@ -234,6 +237,7 @@ int32_t rsmaSnapRead(SRSmaSnapReader* pReader, uint8_t** ppData) {
|
|||
|
||||
_exit:
|
||||
if (code) {
|
||||
rsmaSnapReaderClose(&pReader);
|
||||
smaError("vgId:%d, vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
|
||||
|
@ -246,10 +250,7 @@ int32_t rsmaSnapReaderClose(SRSmaSnapReader** ppReader) {
|
|||
SRSmaSnapReader* pReader = *ppReader;
|
||||
|
||||
tdRSmaFSUnRef(pReader->pSma, &pReader->fs);
|
||||
if (pReader->pQTaskFReader) {
|
||||
taosCloseFile(&pReader->pQTaskFReader->pReadH);
|
||||
taosMemoryFree(pReader->pQTaskFReader);
|
||||
}
|
||||
taosMemoryFreeClear(pReader->pQTaskFReader);
|
||||
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
if (pReader->pDataReader[i]) {
|
||||
|
@ -319,10 +320,10 @@ int32_t rsmaSnapWriterPrepareClose(SRSmaSnapWriter* pWriter) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// rsmaSnapWriterClose
|
||||
|
||||
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
if (pWriter) {
|
||||
code = tdRSmaFSPrepareCommit(pWriter->pSma, &pWriter->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -335,17 +336,25 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
|||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSma* pSma = NULL;
|
||||
SVnode* pVnode = NULL;
|
||||
SSmaEnv* pEnv = NULL;
|
||||
SRSmaStat* pStat = NULL;
|
||||
SRSmaSnapWriter* pWriter = *ppWriter;
|
||||
const char* primaryPath = NULL;
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
char fnameVer[TSDB_FILENAME_LEN] = {0};
|
||||
TdFilePtr pOutFD = NULL;
|
||||
TdFilePtr pInFD = NULL;
|
||||
|
||||
if (!pWriter) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pSma = pWriter->pSma;
|
||||
pVnode = pSma->pVnode;
|
||||
pEnv = SMA_RSMA_ENV(pSma);
|
||||
pStat = (SRSmaStat*)SMA_ENV_STAT(pEnv);
|
||||
primaryPath = tfsGetPrimaryPath(pVnode->pTfs);
|
||||
|
||||
// rsma1/rsma2
|
||||
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
|
||||
|
@ -360,6 +369,45 @@ int32_t rsmaSnapWriterClose(SRSmaSnapWriter** ppWriter, int8_t rollback) {
|
|||
tdRSmaFSRollback(pSma);
|
||||
// remove qTaskFiles
|
||||
} else {
|
||||
// sendFile from fname.Ver to fname
|
||||
SRSmaFS* pFS = &pWriter->fs;
|
||||
int32_t size = taosArrayGetSize(pFS->aQTaskInf);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SQTaskFile* pTaskF = TARRAY_GET_ELEM(pFS->aQTaskInf, i);
|
||||
if (pTaskF->version == pWriter->ever) {
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, pTaskF->version, primaryPath, fnameVer);
|
||||
tdRSmaQTaskInfoGetFullName(TD_VID(pVnode), pTaskF->suid, pTaskF->level, -1, primaryPath, fname);
|
||||
|
||||
pInFD = taosOpenFile(fnameVer, TD_FILE_READ);
|
||||
if (pInFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
pOutFD = taosCreateFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pOutFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t size = 0;
|
||||
if (taosFStatFile(pInFD, &size, NULL) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t offset = 0;
|
||||
if (taosFSendFile(pOutFD, pInFD, &offset, size) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
smaError("vgId:%d, vnode snapshot rsma writer, send qtaskinfo file %s to %s failed since %s", TD_VID(pVnode),
|
||||
fnameVer, fname, tstrerror(code));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
taosCloseFile(&pOutFD);
|
||||
taosCloseFile(&pInFD);
|
||||
}
|
||||
}
|
||||
|
||||
// lock
|
||||
taosWLockLatch(RSMA_FS_LOCK(pStat));
|
||||
code = tdRSmaFSCommit(pSma);
|
||||
|
@ -380,6 +428,8 @@ _exit:
|
|||
if (pWriter) taosMemoryFree(pWriter);
|
||||
*ppWriter = NULL;
|
||||
if (code) {
|
||||
if (pOutFD) taosCloseFile(&pOutFD);
|
||||
if (pInFD) taosCloseFile(&pInFD);
|
||||
smaError("vgId:%d, vnode snapshot rsma writer close failed since %s", SMA_VID(pSma), tstrerror(code));
|
||||
} else {
|
||||
smaInfo("vgId:%d, vnode snapshot rsma writer close succeed", SMA_VID(pSma));
|
||||
|
@ -452,7 +502,10 @@ static int32_t rsmaSnapWriteQTaskInfo(SRSmaSnapWriter* pWriter, uint8_t* pData,
|
|||
}
|
||||
|
||||
uint32_t mtime = 0;
|
||||
if (taosFStatFile(fp, NULL, &mtime) == 0) {
|
||||
if (taosFStatFile(fp, NULL, &mtime) != 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
qTaskFile.mtime = mtime;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue