more code
This commit is contained in:
parent
40c7fed80d
commit
1779c17375
|
@ -257,8 +257,6 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFS);
|
|||
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
|
||||
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
|
||||
|
||||
int32_t tsdbFSRollback(STsdbFS *pFS);
|
||||
|
||||
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
|
||||
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
|
||||
// tsdbReaderWriter.c ==============================================================================================
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
#include "tsdb.h"
|
||||
|
||||
// =================================================================================================
|
||||
static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
|
||||
static int32_t tsdbFSToBinary(uint8_t *p, STsdbFS *pFS) {
|
||||
int32_t n = 0;
|
||||
int8_t hasDel = pFS->pDelFile ? 1 : 0;
|
||||
uint32_t nSet = taosArrayGetSize(pFS->aDFileSet);
|
||||
|
@ -39,50 +39,96 @@ static int32_t tsdbEncodeFS(uint8_t *p, STsdbFS *pFS) {
|
|||
return n;
|
||||
}
|
||||
|
||||
static int32_t tsdbGnrtCurrent(STsdb *pTsdb, STsdbFS *pFS, char *fname) {
|
||||
static int32_t tsdbBinaryToFS(uint8_t *pData, int64_t nData, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int64_t n;
|
||||
int64_t size;
|
||||
uint8_t *pData = NULL;
|
||||
TdFilePtr pFD = NULL;
|
||||
int32_t n = 0;
|
||||
|
||||
// to binary
|
||||
size = tsdbEncodeFS(NULL, pFS) + sizeof(TSCKSUM);
|
||||
pData = taosMemoryMalloc(size);
|
||||
// version
|
||||
n += tGetI8(pData + n, NULL);
|
||||
|
||||
// SDelFile
|
||||
int8_t hasDel = 0;
|
||||
n += tGetI8(pData + n, &hasDel);
|
||||
if (hasDel) {
|
||||
pFS->pDelFile = (SDelFile *)taosMemoryCalloc(1, sizeof(SDelFile));
|
||||
if (pFS->pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
n += tGetDelFile(pData + n, pFS->pDelFile);
|
||||
pFS->pDelFile->nRef = 1;
|
||||
} else {
|
||||
pFS->pDelFile = NULL;
|
||||
}
|
||||
|
||||
// aDFileSet
|
||||
taosArrayClear(pFS->aDFileSet);
|
||||
uint32_t nSet = 0;
|
||||
n += tGetU32v(pData + n, &nSet);
|
||||
for (uint32_t iSet = 0; iSet < nSet; iSet++) {
|
||||
SDFileSet fSet = {0};
|
||||
|
||||
int32_t nt = tGetDFileSet(pData + n, &fSet);
|
||||
if (nt < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
n += nt;
|
||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == nData);
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// encode to binary
|
||||
int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
|
||||
uint8_t *pData = taosMemoryMalloc(size);
|
||||
if (pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
n = tsdbEncodeFS(pData, pFS);
|
||||
ASSERT(n + sizeof(TSCKSUM) == size);
|
||||
tsdbFSToBinary(pData, pFS);
|
||||
taosCalcChecksumAppend(0, pData, size);
|
||||
|
||||
// create and write
|
||||
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
// save to file
|
||||
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC);
|
||||
if (pFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
n = taosWriteFile(pFD, pData, size);
|
||||
int64_t n = taosWriteFile(pFD, pData, size);
|
||||
if (n < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
taosCloseFile(&pFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (taosFsyncFile(pFD) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
taosCloseFile(&pFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosCloseFile(&pFD);
|
||||
|
||||
_exit:
|
||||
if (pData) taosMemoryFree(pData);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb gnrt current failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
if (pData) taosMemoryFree(pData);
|
||||
if (code) {
|
||||
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -120,76 +166,81 @@ void tsdbFSDestroy(STsdbFS *pFS) {
|
|||
|
||||
static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int64_t size;
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
int32_t lino = 0;
|
||||
int64_t size = 0;
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
// SDelFile
|
||||
if (pTsdb->fs.pDelFile) {
|
||||
tsdbDelFileName(pTsdb, pTsdb->fs.pDelFile, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
// SArray<SDFileSet>
|
||||
int32_t fid = 0;
|
||||
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
|
||||
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
|
||||
fid = pSet->fid;
|
||||
|
||||
// head =========
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (size != tsdbLogicToFileSize(pSet->pHeadF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// data =========
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (size < tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
} else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
// else if (size > tsdbLogicToFileSize(pSet->pDataF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
// code = tsdbDFileRollback(pTsdb, pSet, TSDB_DATA_FILE);
|
||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||
// }
|
||||
|
||||
// sma =============
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (size < tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
} else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
|
||||
if (code) goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
// else if (size > tsdbLogicToFileSize(pSet->pSmaF->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
// code = tsdbDFileRollback(pTsdb, pSet, TSDB_SMA_FILE);
|
||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
||||
// }
|
||||
|
||||
// stt ===========
|
||||
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
if (taosStatFile(fname, &size, NULL)) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
if (size != tsdbLogicToFileSize(pSet->aSttF[iStt]->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
goto _err;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -198,10 +249,11 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
|||
// remove those invalid files (todo)
|
||||
}
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d, tsdb scan and try fix fs failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s, fid:%d", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code),
|
||||
fid);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -215,57 +267,6 @@ int32_t tDFileSetCmprFn(const void *p1, const void *p2) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbRecoverFS(STsdb *pTsdb, uint8_t *pData, int64_t nData) {
|
||||
int32_t code = 0;
|
||||
int8_t hasDel;
|
||||
uint32_t nSet;
|
||||
int32_t n = 0;
|
||||
|
||||
// version
|
||||
n += tGetI8(pData + n, NULL);
|
||||
|
||||
// SDelFile
|
||||
n += tGetI8(pData + n, &hasDel);
|
||||
if (hasDel) {
|
||||
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pTsdb->fs.pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
pTsdb->fs.pDelFile->nRef = 1;
|
||||
n += tGetDelFile(pData + n, pTsdb->fs.pDelFile);
|
||||
} else {
|
||||
pTsdb->fs.pDelFile = NULL;
|
||||
}
|
||||
|
||||
// SArray<SDFileSet>
|
||||
taosArrayClear(pTsdb->fs.aDFileSet);
|
||||
n += tGetU32v(pData + n, &nSet);
|
||||
for (uint32_t iSet = 0; iSet < nSet; iSet++) {
|
||||
SDFileSet fSet = {0};
|
||||
|
||||
int32_t nt = tGetDFileSet(pData + n, &fSet);
|
||||
if (nt < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
n += nt;
|
||||
|
||||
if (taosArrayPush(pTsdb->fs.aDFileSet, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(n + sizeof(TSCKSUM) == nData);
|
||||
return code;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
if (pVnode->pTfs) {
|
||||
|
@ -287,26 +288,15 @@ static void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t) {
|
|||
}
|
||||
}
|
||||
|
||||
// EXPOSED APIS ====================================================================================
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
||||
static int32_t tsdbLoadFSFromFile(const char *fname, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
uint8_t *pData = NULL;
|
||||
|
||||
// open handle
|
||||
code = tsdbFSCreate(&pTsdb->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// load fs or keep empty
|
||||
char current[TSDB_FILENAME_LEN] = {0};
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, current, current_t);
|
||||
|
||||
if (taosCheckExistFile(current)) {
|
||||
// read
|
||||
TdFilePtr pFD = taosOpenFile(current, TD_FILE_READ);
|
||||
// load binary
|
||||
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_READ);
|
||||
if (pFD == NULL) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
@ -317,42 +307,483 @@ int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
|||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
uint8_t *pData = taosMemoryMalloc(size);
|
||||
pData = taosMemoryMalloc(size);
|
||||
if (pData == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosCloseFile(&pFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
int64_t n = taosReadFile(pFD, pData, size);
|
||||
if (n < 0) {
|
||||
if (taosReadFile(pFD, pData, size) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
taosMemoryFree(pData);
|
||||
taosCloseFile(&pFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole(pData, size)) {
|
||||
code = TSDB_CODE_FILE_CORRUPTED;
|
||||
taosMemoryFree(pData);
|
||||
taosCloseFile(&pFD);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosCloseFile(&pFD);
|
||||
|
||||
// recover fs
|
||||
code = tsdbRecoverFS(pTsdb, pData, size);
|
||||
// decode binary
|
||||
code = tsdbBinaryToFS(pData, size, pFS);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (pData) taosMemoryFree(pData);
|
||||
if (code) {
|
||||
taosMemoryFree(pData);
|
||||
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbRemoveFileSet(STsdb *pTsdb, SDFileSet *pSet) {
|
||||
int32_t code = 0;
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
int32_t nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pHeadF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pDataF);
|
||||
}
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->pSmaF);
|
||||
}
|
||||
|
||||
for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) {
|
||||
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSet->aSttF[iStt]);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFrom) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
*pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0};
|
||||
|
||||
// head
|
||||
pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (pSetTo->pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetTo->pHeadF = *pSetFrom->pHeadF;
|
||||
pSetTo->pHeadF->nRef = 1;
|
||||
|
||||
// data
|
||||
pSetTo->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (pSetTo->pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetTo->pDataF = *pSetFrom->pDataF;
|
||||
pSetTo->pDataF->nRef = 1;
|
||||
|
||||
// sma
|
||||
pSetTo->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (pSetTo->pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetTo->pSmaF = *pSetFrom->pSmaF;
|
||||
pSetTo->pSmaF->nRef = 1;
|
||||
|
||||
// stt
|
||||
for (int32_t iStt = 0; iStt < pSetFrom->nSttF; iStt++) {
|
||||
pSetTo->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetTo->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
taosMemoryFree(pData);
|
||||
pSetTo->nSttF++;
|
||||
*pSetTo->aSttF[iStt] = *pSetFrom->aSttF[iStt];
|
||||
pSetTo->aSttF[iStt]->nRef = 1;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbMergeFileSet(STsdb *pTsdb, SDFileSet *pSetOld, SDFileSet *pSetNew) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t nRef = 0;
|
||||
bool sameDisk = ((pSetOld->diskId.level == pSetNew->diskId.level) && (pSetOld->diskId.id == pSetNew->diskId.id));
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
// head
|
||||
SHeadFile *pHeadF = pSetOld->pHeadF;
|
||||
if ((!sameDisk) || (pHeadF->commitID != pSetNew->pHeadF->commitID)) {
|
||||
pSetOld->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (pSetOld->pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->pHeadF = *pSetNew->pHeadF;
|
||||
pSetOld->pHeadF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pHeadF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbHeadFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pHeadF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pHeadF);
|
||||
}
|
||||
} else {
|
||||
nRef = pHeadF->nRef;
|
||||
*pHeadF = *pSetNew->pHeadF;
|
||||
pHeadF->nRef = nRef;
|
||||
}
|
||||
|
||||
// data
|
||||
SDataFile *pDataF = pSetOld->pDataF;
|
||||
if ((!sameDisk) || (pDataF->commitID != pSetNew->pDataF->commitID)) {
|
||||
pSetOld->pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (pSetOld->pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->pDataF = *pSetNew->pDataF;
|
||||
pSetOld->pDataF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pDataF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDataFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pDataF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pDataF);
|
||||
}
|
||||
} else {
|
||||
nRef = pDataF->nRef;
|
||||
*pDataF = *pSetNew->pDataF;
|
||||
pDataF->nRef = nRef;
|
||||
}
|
||||
|
||||
// sma
|
||||
SSmaFile *pSmaF = pSetOld->pSmaF;
|
||||
if ((!sameDisk) || (pSmaF->commitID != pSetNew->pSmaF->commitID)) {
|
||||
pSetOld->pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (pSetOld->pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->pSmaF = *pSetNew->pSmaF;
|
||||
pSetOld->pSmaF->nRef = 1;
|
||||
|
||||
nRef = atomic_sub_fetch_32(&pSmaF->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSmaFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSmaF, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSmaF);
|
||||
}
|
||||
} else {
|
||||
nRef = pSmaF->nRef;
|
||||
*pSmaF = *pSetNew->pSmaF;
|
||||
pSmaF->nRef = nRef;
|
||||
}
|
||||
|
||||
// stt
|
||||
if (sameDisk) {
|
||||
if (pSetNew->nSttF > pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF == pSetOld->nSttF + 1);
|
||||
pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[pSetOld->nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF];
|
||||
pSetOld->aSttF[pSetOld->nSttF]->nRef = 1;
|
||||
pSetOld->nSttF++;
|
||||
} else if (pSetNew->nSttF < pSetOld->nSttF) {
|
||||
ASSERT(pSetNew->nSttF == 1);
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
pSetOld->aSttF[iStt] = NULL;
|
||||
}
|
||||
|
||||
pSetOld->nSttF = 1;
|
||||
pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->aSttF[0] = *pSetNew->aSttF[0];
|
||||
pSetOld->aSttF[0]->nRef = 1;
|
||||
} else {
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
} else {
|
||||
ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size);
|
||||
ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) {
|
||||
SSttFile *pSttFile = pSetOld->aSttF[iStt];
|
||||
nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pSttFile);
|
||||
}
|
||||
}
|
||||
|
||||
pSetOld->nSttF = 0;
|
||||
for (int32_t iStt = 0; iStt < pSetNew->nSttF; iStt++) {
|
||||
pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (pSetOld->aSttF[iStt] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt];
|
||||
pSetOld->aSttF[iStt]->nRef = 1;
|
||||
|
||||
pSetOld->nSttF++;
|
||||
}
|
||||
}
|
||||
|
||||
if (!sameDisk) {
|
||||
pSetOld->diskId = pSetNew->diskId;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSApplyChange(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t nRef = 0;
|
||||
char fname[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
// SDelFile
|
||||
if (pFS->pDelFile) {
|
||||
SDelFile *pDelFile = pTsdb->fs.pDelFile;
|
||||
|
||||
if (pDelFile == NULL || (pDelFile->commitID != pFS->pDelFile->commitID)) {
|
||||
pTsdb->fs.pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pTsdb->fs.pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*pTsdb->fs.pDelFile = *pFS->pDelFile;
|
||||
pTsdb->fs.pDelFile->nRef = 1;
|
||||
|
||||
if (pDelFile) {
|
||||
nRef = atomic_sub_fetch_32(&pDelFile->nRef, 1);
|
||||
if (nRef == 0) {
|
||||
tsdbDelFileName(pTsdb, pDelFile, fname);
|
||||
(void)taosRemoveFile(fname);
|
||||
taosMemoryFree(pDelFile);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
ASSERT(pTsdb->fs.pDelFile == NULL);
|
||||
}
|
||||
|
||||
// aDFileSet
|
||||
int32_t iOld = 0;
|
||||
int32_t iNew = 0;
|
||||
while (true) {
|
||||
int32_t nOld = taosArrayGetSize(pTsdb->fs.aDFileSet);
|
||||
int32_t nNew = taosArrayGetSize(pFS->aDFileSet);
|
||||
SDFileSet fSet = {0};
|
||||
int8_t sameDisk = 0;
|
||||
|
||||
if (iOld >= nOld && iNew >= nNew) break;
|
||||
|
||||
SDFileSet *pSetOld = (iOld < nOld) ? taosArrayGet(pTsdb->fs.aDFileSet, iOld) : NULL;
|
||||
SDFileSet *pSetNew = (iNew < nNew) ? taosArrayGet(pFS->aDFileSet, iNew) : NULL;
|
||||
|
||||
if (pSetOld && pSetNew) {
|
||||
if (pSetOld->fid == pSetNew->fid) {
|
||||
code = tsdbMergeFileSet(pTsdb, pSetOld, pSetNew);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
iOld++;
|
||||
iNew++;
|
||||
} else if (pSetOld->fid < pSetNew->fid) {
|
||||
code = tsdbRemoveFileSet(pTsdb, pSetOld);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
|
||||
} else {
|
||||
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
|
||||
TSDB_CHECK_CODE(code, lino, _exit)
|
||||
|
||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
iOld++;
|
||||
iNew++;
|
||||
}
|
||||
} else if (pSetOld) {
|
||||
code = tsdbRemoveFileSet(pTsdb, pSetOld);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
taosArrayRemove(pTsdb->fs.aDFileSet, iOld);
|
||||
} else {
|
||||
code = tsdbNewFileSet(pTsdb, &fSet, pSetNew);
|
||||
TSDB_CHECK_CODE(code, lino, _exit)
|
||||
|
||||
if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
iOld++;
|
||||
iNew++;
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSCommit(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
char current[TSDB_FILENAME_LEN] = {0};
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, current, current_t);
|
||||
|
||||
// rename the file
|
||||
if (taosRenameFile(current_t, current) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
// Load the new FS
|
||||
STsdbFS fs = {0};
|
||||
code = tsdbFSCreate(&fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbLoadFSFromFile(current_t, &fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// apply file change
|
||||
code = tsdbFSApplyChange(pTsdb, &fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
tsdbFSDestroy(&fs);
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbFSRollback(STsdb *pTsdb) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, NULL, current_t);
|
||||
|
||||
if (taosRemoveFile(current_t) < 0) {
|
||||
code = TAOS_SYSTEM_ERROR(errno);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(errno));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// EXPOSED APIS ====================================================================================
|
||||
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode *pVnode = pTsdb->pVnode;
|
||||
|
||||
// open handle
|
||||
code = tsdbFSCreate(&pTsdb->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// open impl
|
||||
char current[TSDB_FILENAME_LEN] = {0};
|
||||
char current_t[TSDB_FILENAME_LEN] = {0};
|
||||
tsdbGetCurrentFName(pTsdb, current, current_t);
|
||||
|
||||
if (taosCheckExistFile(current)) {
|
||||
code = tsdbLoadFSFromFile(current, &pTsdb->fs);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (taosCheckExistFile(current_t)) {
|
||||
if (rollback) {
|
||||
code = tsdbFSRollback(pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbFSCommit(pTsdb);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// empty one
|
||||
code = tsdbGnrtCurrent(pTsdb, &pTsdb->fs, current);
|
||||
code = tsdbSaveFSToFile(&pTsdb->fs, current);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
ASSERT(!rollback);
|
||||
}
|
||||
|
||||
// scan and fix FS
|
||||
|
@ -363,7 +794,6 @@ _exit:
|
|||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -404,19 +834,24 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
|
|||
|
||||
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
pFS->pDelFile = NULL;
|
||||
if (pFS->aDFileSet) {
|
||||
taosArrayClear(pFS->aDFileSet);
|
||||
} else {
|
||||
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
|
||||
if (pFS->aDFileSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTsdb->fs.pDelFile) {
|
||||
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
|
||||
if (pFS->pDelFile == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
*pFS->pDelFile = *pTsdb->fs.pDelFile;
|
||||
|
@ -430,7 +865,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||
if (fSet.pHeadF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pHeadF = *pSet->pHeadF;
|
||||
|
||||
|
@ -438,7 +873,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
|
||||
if (fSet.pDataF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pDataF = *pSet->pDataF;
|
||||
|
||||
|
@ -446,7 +881,7 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
|
||||
if (fSet.pSmaF == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.pSmaF = *pSet->pSmaF;
|
||||
|
||||
|
@ -455,26 +890,21 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
|
|||
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
|
||||
if (fSet.aSttF[fSet.nSttF] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
|
||||
}
|
||||
|
||||
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbFSRollback(STsdbFS *pFS) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(0);
|
||||
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -591,7 +1021,7 @@ int32_t tsdbFSCommit1(STsdb *pTsdb, STsdbFS *pFSNew) {
|
|||
pTsdb->path, TD_DIRSEP);
|
||||
|
||||
// gnrt CURRENT.t
|
||||
code = tsdbGnrtCurrent(pTsdb, pFSNew, tfname);
|
||||
code = tsdbSaveFSToFile(pFSNew, tfname);
|
||||
if (code) goto _err;
|
||||
|
||||
// rename
|
||||
|
|
|
@ -343,6 +343,7 @@ TdFilePtr taosOpenFile(const char *path, int32_t tdFileOptions) {
|
|||
|
||||
TdFilePtr pFile = (TdFilePtr)taosMemoryMalloc(sizeof(TdFile));
|
||||
if (pFile == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (fd >= 0) close(fd);
|
||||
if (fp != NULL) fclose(fp);
|
||||
return NULL;
|
||||
|
|
Loading…
Reference in New Issue