enh: refact errno
This commit is contained in:
parent
c9a5c30ea9
commit
3e262250d7
|
@ -90,12 +90,14 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
|
static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
TdFilePtr pFD = NULL;
|
||||||
|
uint8_t *pData = NULL;
|
||||||
|
|
||||||
// encode to binary
|
// encode to binary
|
||||||
int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
|
int32_t size = tsdbFSToBinary(NULL, pFS) + sizeof(TSCKSUM);
|
||||||
uint8_t *pData = taosMemoryMalloc(size);
|
pData = taosMemoryMalloc(size);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -104,32 +106,26 @@ static int32_t tsdbSaveFSToFile(STsdbFS *pFS, const char *fname) {
|
||||||
taosCalcChecksumAppend(0, pData, size);
|
taosCalcChecksumAppend(0, pData, size);
|
||||||
|
|
||||||
// save to file
|
// save to file
|
||||||
TdFilePtr pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
pFD = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (pFD == NULL) {
|
if (pFD == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t n = taosWriteFile(pFD, pData, size);
|
int64_t n = taosWriteFile(pFD, pData, size);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
taosCloseFile(&pFD);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosFsyncFile(pFD) < 0) {
|
if (taosFsyncFile(pFD) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
taosCloseFile(&pFD);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFD);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (pData) taosMemoryFree(pData);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pData);
|
||||||
|
taosCloseFile(&pFD);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,11 +176,6 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if (size != tsdbLogicToFileSize(pTsdb->fs.pDelFile->size, pTsdb->pVnode->config.tsdbPageSize)) {
|
|
||||||
// code = TSDB_CODE_FILE_CORRUPTED;
|
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// SArray<SDFileSet>
|
// SArray<SDFileSet>
|
||||||
|
@ -327,17 +318,15 @@ static int32_t load_fs(const char *fname, STsdbFS *pFS) {
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&pFD);
|
|
||||||
|
|
||||||
// decode binary
|
|
||||||
code = tsdbBinaryToFS(pData, size, pFS);
|
code = tsdbBinaryToFS(pData, size, pFS);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (pData) taosMemoryFree(pData);
|
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
||||||
}
|
}
|
||||||
|
taosMemoryFree(pData);
|
||||||
|
taosCloseFile(&pFD);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -383,7 +372,11 @@ static int32_t tsdbNewFileSet(STsdb *pTsdb, SDFileSet *pSetTo, SDFileSet *pSetFr
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
*pSetTo = (SDFileSet){.diskId = pSetFrom->diskId, .fid = pSetFrom->fid, .nSttF = 0};
|
*pSetTo = (SDFileSet){
|
||||||
|
.diskId = pSetFrom->diskId,
|
||||||
|
.fid = pSetFrom->fid,
|
||||||
|
.nSttF = 0,
|
||||||
|
};
|
||||||
|
|
||||||
// head
|
// head
|
||||||
pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
pSetTo->pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
|
||||||
|
|
|
@ -41,7 +41,9 @@ static const char *gCurrentFname[] = {
|
||||||
|
|
||||||
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
|
static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
|
||||||
fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
|
fs[0] = taosMemoryCalloc(1, sizeof(*fs[0]));
|
||||||
if (fs[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (fs[0] == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
fs[0]->tsdb = pTsdb;
|
fs[0]->tsdb = pTsdb;
|
||||||
tsem_init(&fs[0]->canEdit, 0, 1);
|
tsem_init(&fs[0]->canEdit, 0, 1);
|
||||||
|
@ -75,69 +77,75 @@ int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t save_json(const cJSON *json, const char *fname) {
|
static int32_t save_json(const cJSON *json, const char *fname) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
char *data = NULL;
|
||||||
|
TdFilePtr fp = NULL;
|
||||||
|
|
||||||
char *data = cJSON_PrintUnformatted(json);
|
data = cJSON_PrintUnformatted(json);
|
||||||
if (data == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (data == NULL) {
|
||||||
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
TdFilePtr fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
fp = taosOpenFile(fname, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (fp == NULL) {
|
if (fp == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosWriteFile(fp, data, strlen(data)) < 0) {
|
if (taosWriteFile(fp, data, strlen(data)) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosFsyncFile(fp) < 0) {
|
if (taosFsyncFile(fp) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCloseFile(&fp);
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
|
||||||
|
}
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
|
taosCloseFile(&fp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t load_json(const char *fname, cJSON **json) {
|
static int32_t load_json(const char *fname, cJSON **json) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
char *data = NULL;
|
char *data = NULL;
|
||||||
|
|
||||||
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
|
TdFilePtr fp = taosOpenFile(fname, TD_FILE_READ);
|
||||||
if (fp == NULL) return TAOS_SYSTEM_ERROR(code);
|
if (fp == NULL) {
|
||||||
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
int64_t size;
|
int64_t size;
|
||||||
if (taosFStatFile(fp, &size, NULL) < 0) {
|
if (taosFStatFile(fp, &size, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data = taosMemoryMalloc(size + 1);
|
data = taosMemoryMalloc(size + 1);
|
||||||
if (data == NULL) {
|
if (data == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosReadFile(fp, data, size) < 0) {
|
if (taosReadFile(fp, data, size) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(code);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(code), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
data[size] = '\0';
|
data[size] = '\0';
|
||||||
|
|
||||||
json[0] = cJSON_Parse(data);
|
json[0] = cJSON_Parse(data);
|
||||||
if (json[0] == NULL) {
|
if (json[0] == NULL) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("%s failed at %s:%d since %s", __func__, fname, __LINE__, tstrerror(code));
|
||||||
|
json[0] = NULL;
|
||||||
|
}
|
||||||
taosCloseFile(&fp);
|
taosCloseFile(&fp);
|
||||||
if (data) taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
if (code) json[0] = NULL;
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,26 +154,23 @@ int32_t save_fs(const TFileSetArray *arr, const char *fname) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
cJSON *json = cJSON_CreateObject();
|
cJSON *json = cJSON_CreateObject();
|
||||||
if (!json) return TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
|
|
||||||
// fmtv
|
// fmtv
|
||||||
if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
|
if (cJSON_AddNumberToObject(json, "fmtv", 1) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// fset
|
// fset
|
||||||
cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
|
cJSON *ajson = cJSON_AddArrayToObject(json, "fset");
|
||||||
if (!ajson) {
|
if (!ajson) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
const STFileSet *fset;
|
const STFileSet *fset;
|
||||||
TARRAY2_FOREACH(arr, fset) {
|
TARRAY2_FOREACH(arr, fset) {
|
||||||
cJSON *item = cJSON_CreateObject();
|
cJSON *item = cJSON_CreateObject();
|
||||||
if (!item) {
|
if (!item) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
cJSON_AddItemToArray(ajson, item);
|
cJSON_AddItemToArray(ajson, item);
|
||||||
|
|
||||||
|
@ -226,14 +231,17 @@ static int32_t load_fs(STsdb *pTsdb, const char *fname, TFileSetArray *arr) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("%s failed at line %d since %s, fname:%s", __func__, lino, tstrerror(code), fname);
|
tsdbError("%s failed at %sP%d since %s, fname:%s", __func__, __FILE__, lino, tstrerror(code), fname);
|
||||||
|
}
|
||||||
|
if (json) {
|
||||||
|
cJSON_Delete(json);
|
||||||
}
|
}
|
||||||
if (json) cJSON_Delete(json);
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t apply_commit(STFileSystem *fs) {
|
static int32_t apply_commit(STFileSystem *fs) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
TFileSetArray *fsetArray1 = fs->fSetArr;
|
TFileSetArray *fsetArray1 = fs->fSetArr;
|
||||||
TFileSetArray *fsetArray2 = fs->fSetArrTmp;
|
TFileSetArray *fsetArray2 = fs->fSetArrTmp;
|
||||||
int32_t i1 = 0, i2 = 0;
|
int32_t i1 = 0, i2 = 0;
|
||||||
|
@ -250,15 +258,15 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
} else if (fset1->fid > fset2->fid) {
|
} else if (fset1->fid > fset2->fid) {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
|
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
|
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
i1++;
|
i1++;
|
||||||
i2++;
|
i2++;
|
||||||
} else {
|
} else {
|
||||||
// edit
|
// edit
|
||||||
code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
|
code = tsdbTFileSetApplyEdit(fs->tsdb, fset2, fset1);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
i1++;
|
i1++;
|
||||||
i2++;
|
i2++;
|
||||||
}
|
}
|
||||||
|
@ -269,15 +277,19 @@ static int32_t apply_commit(STFileSystem *fs) {
|
||||||
} else {
|
} else {
|
||||||
// create new file set with fid of fset2->fid
|
// create new file set with fid of fset2->fid
|
||||||
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
|
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
|
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
i1++;
|
i1++;
|
||||||
i2++;
|
i2++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t commit_edit(STFileSystem *fs) {
|
static int32_t commit_edit(STFileSystem *fs) {
|
||||||
|
@ -303,7 +315,8 @@ static int32_t commit_edit(STFileSystem *fs) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(fs->tsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
|
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
|
||||||
}
|
}
|
||||||
|
@ -335,7 +348,8 @@ static int32_t abort_edit(STFileSystem *fs) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed since %s", TD_VID(fs->tsdb->pVnode), __func__, tstrerror(code));
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(fs->tsdb->pVnode), __func__, __FILE__, lino,
|
||||||
|
tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
|
tsdbInfo("vgId:%d %s success, etype:%d", TD_VID(fs->tsdb->pVnode), __func__, fs->etype);
|
||||||
}
|
}
|
||||||
|
@ -360,13 +374,6 @@ static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
found = true;
|
found = true;
|
||||||
/*
|
|
||||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
|
||||||
long s3_size = s3Size(object_name);
|
|
||||||
if (s3_size > 0) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!found) {
|
if (!found) {
|
||||||
|
@ -376,16 +383,6 @@ static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
{ // TODO: check file size
|
|
||||||
// int64_t fsize;
|
|
||||||
// if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
|
|
||||||
// code = TAOS_SYSTEM_ERROR(terrno);
|
|
||||||
// tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__,
|
|
||||||
// fobj->fname, tstrerror(code));
|
|
||||||
// return code;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -408,6 +405,7 @@ static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname)
|
||||||
|
|
||||||
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
|
|
||||||
// init hash table
|
// init hash table
|
||||||
|
@ -415,14 +413,13 @@ static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
||||||
hash->numBucket = 4096;
|
hash->numBucket = 4096;
|
||||||
hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
|
hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *));
|
||||||
if (hash->buckets == NULL) {
|
if (hash->buckets == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// vnode.json
|
// vnode.json
|
||||||
current_fname(fs->tsdb, fname, TSDB_FCURRENT);
|
current_fname(fs->tsdb, fname, TSDB_FCURRENT);
|
||||||
code = tsdbFSAddEntryToFileObjHash(hash, fname);
|
code = tsdbFSAddEntryToFileObjHash(hash, fname);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// other
|
// other
|
||||||
STFileSet *fset = NULL;
|
STFileSet *fset = NULL;
|
||||||
|
@ -431,7 +428,7 @@ static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
||||||
for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
|
for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) {
|
||||||
if (fset->farr[i] != NULL) {
|
if (fset->farr[i] != NULL) {
|
||||||
code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
|
code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -441,13 +438,14 @@ static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) {
|
||||||
STFileObj *fobj;
|
STFileObj *fobj;
|
||||||
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
|
||||||
code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
|
code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code);
|
||||||
tsdbFSDestroyFileObjHash(hash);
|
tsdbFSDestroyFileObjHash(hash);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -35,7 +35,9 @@ int32_t tsdbFSetRAWWriterOpen(SFSetRAWWriterConfig *config, SFSetRAWWriter **wri
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
writer[0] = taosMemoryCalloc(1, sizeof(SFSetRAWWriter));
|
writer[0] = taosMemoryCalloc(1, sizeof(SFSetRAWWriter));
|
||||||
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (writer[0] == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
writer[0]->config[0] = config[0];
|
writer[0]->config[0] = config[0];
|
||||||
|
|
||||||
|
|
|
@ -44,9 +44,10 @@ static int32_t tsdbFSetWriteTableDataBegin(SFSetWriter *writer, const TABLEID *t
|
||||||
writer->ctx->tbid->uid = tbid->uid;
|
writer->ctx->tbid->uid = tbid->uid;
|
||||||
|
|
||||||
code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
|
code = tsdbUpdateSkmTb(writer->config->tsdb, writer->ctx->tbid, writer->skmTb);
|
||||||
|
TSDB_CHECK_CODE(code , lino, _exit);
|
||||||
|
|
||||||
code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
|
code = metaGetColCmpr(writer->config->tsdb->pVnode->pMeta, tbid->suid ? tbid->suid : tbid->uid, &writer->pColCmprObj);
|
||||||
// TSDB_CHECK_CODE(code, lino, _exit);
|
// TODO: TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
writer->blockDataIdx = 0;
|
writer->blockDataIdx = 0;
|
||||||
for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
|
for (int32_t i = 0; i < ARRAY_SIZE(writer->blockData); i++) {
|
||||||
|
@ -136,7 +137,9 @@ int32_t tsdbFSetWriterOpen(SFSetWriterConfig *config, SFSetWriter **writer) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
|
writer[0] = taosMemoryCalloc(1, sizeof(*writer[0]));
|
||||||
if (writer[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
if (writer[0] == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
writer[0]->config[0] = config[0];
|
writer[0]->config[0] = config[0];
|
||||||
|
|
||||||
|
|
|
@ -16,10 +16,12 @@
|
||||||
#include "cos.h"
|
#include "cos.h"
|
||||||
#include "crypt.h"
|
#include "crypt.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "tsdbDef.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
const char *path = pFD->path;
|
const char *path = pFD->path;
|
||||||
int32_t szPage = pFD->szPage;
|
int32_t szPage = pFD->szPage;
|
||||||
int32_t flag = pFD->flag;
|
int32_t flag = pFD->flag;
|
||||||
|
@ -38,70 +40,27 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
||||||
char *dot = strrchr(lc_path, '.');
|
char *dot = strrchr(lc_path, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("unexpected path: %s", lc_path);
|
tsdbError("unexpected path: %s", lc_path);
|
||||||
code = TAOS_SYSTEM_ERROR(ENOENT);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - lc_path), "%d.data", pFD->lcn);
|
||||||
|
|
||||||
pFD->pFD = taosOpenFile(lc_path, flag);
|
pFD->pFD = taosOpenFile(lc_path, flag);
|
||||||
if (pFD->pFD == NULL) {
|
if (pFD->pFD == NULL) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
|
if (taosStatFile(lc_path, &lc_size, NULL, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
// taosCloseFile(&pFD->pFD);
|
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tsdbInfo("no file: %s", path);
|
tsdbInfo("no file: %s", path);
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
pFD->s3File = 1;
|
pFD->s3File = 1;
|
||||||
/*
|
|
||||||
const char *object_name = taosDirEntryBaseName((char *)path);
|
|
||||||
long s3_size = 0;
|
|
||||||
if (tsS3Enabled) {
|
|
||||||
long size = s3Size(object_name);
|
|
||||||
if (size < 0) {
|
|
||||||
code = terrno = TSDB_CODE_FAILED_TO_CONNECT_S3;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
s3_size = size;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsS3Enabled && !strncmp(path + strlen(path) - 5, ".data", 5) && s3_size > 0) {
|
|
||||||
#ifndef S3_BLOCK_CACHE
|
|
||||||
s3EvictCache(path, s3_size);
|
|
||||||
s3Get(object_name, path);
|
|
||||||
|
|
||||||
pFD->pFD = taosOpenFile(path, flag);
|
|
||||||
if (pFD->pFD == NULL) {
|
|
||||||
code = TAOS_SYSTEM_ERROR(ENOENT);
|
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
#else
|
|
||||||
pFD->pFD = (TdFilePtr)&pFD->s3File;
|
|
||||||
int32_t vid = 0;
|
|
||||||
sscanf(object_name, "v%df%dver%" PRId64 ".data", &vid, &pFD->fid, &pFD->cid);
|
|
||||||
pFD->objName = object_name;
|
|
||||||
// pFD->szFile = s3_size;
|
|
||||||
#endif
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFD->pBuf = taosMemoryCalloc(1, szPage);
|
pFD->pBuf = taosMemoryCalloc(1, szPage);
|
||||||
if (pFD->pBuf == NULL) {
|
if (pFD->pBuf == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
// taosCloseFile(&pFD->pFD);
|
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lc_size > 0) {
|
if (lc_size > 0) {
|
||||||
|
@ -114,11 +73,7 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
||||||
// not check file size when reading data files.
|
// not check file size when reading data files.
|
||||||
if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
|
if (flag != TD_FILE_READ /* && !pFD->s3File*/) {
|
||||||
if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
|
if (!lc_size && taosStatFile(path, &pFD->szFile, NULL, NULL) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
// taosMemoryFree(pFD->pBuf);
|
|
||||||
// taosCloseFile(&pFD->pFD);
|
|
||||||
// taosMemoryFree(pFD);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -126,12 +81,16 @@ static int32_t tsdbOpenFileImpl(STsdbFD *pFD) {
|
||||||
pFD->szFile = pFD->szFile / szPage;
|
pFD->szFile = pFD->szFile / szPage;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// =============== PAGE-WISE FILE ===============
|
// =============== PAGE-WISE FILE ===============
|
||||||
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
|
int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppFD, int32_t lcn) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
STsdbFD *pFD = NULL;
|
STsdbFD *pFD = NULL;
|
||||||
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||||
|
|
||||||
|
@ -139,8 +98,7 @@ int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppF
|
||||||
|
|
||||||
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
|
pFD = (STsdbFD *)taosMemoryCalloc(1, sizeof(*pFD) + strlen(path) + 1);
|
||||||
if (pFD == NULL) {
|
if (pFD == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFD->path = (char *)&pFD[1];
|
pFD->path = (char *)&pFD[1];
|
||||||
|
@ -155,6 +113,9 @@ int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbFD **ppF
|
||||||
*ppFD = pFD;
|
*ppFD = pFD;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,12 +133,11 @@ void tsdbCloseFile(STsdbFD **ppFD) {
|
||||||
|
|
||||||
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
|
static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
if (!pFD->pFD) {
|
if (!pFD->pFD) {
|
||||||
code = tsdbOpenFileImpl(pFD);
|
code = tsdbOpenFileImpl(pFD);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFD->pgno > 0) {
|
if (pFD->pgno > 0) {
|
||||||
|
@ -193,8 +153,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
|
||||||
|
|
||||||
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
taosCalcChecksumAppend(0, pFD->pBuf, pFD->szPage);
|
||||||
|
@ -223,8 +182,7 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
|
||||||
|
|
||||||
n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
n = taosWriteFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFD->szFile < pFD->pgno) {
|
if (pFD->szFile < pFD->pgno) {
|
||||||
|
@ -234,18 +192,20 @@ static int32_t tsdbWriteFilePage(STsdbFD *pFD, int32_t encryptAlgorithm, char *e
|
||||||
pFD->pgno = 0;
|
pFD->pgno = 0;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
|
static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgorithm, char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
// ASSERT(pgno <= pFD->szFile);
|
// ASSERT(pgno <= pFD->szFile);
|
||||||
if (!pFD->pFD) {
|
if (!pFD->pFD) {
|
||||||
code = tsdbOpenFileImpl(pFD);
|
code = tsdbOpenFileImpl(pFD);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
|
int64_t offset = PAGE_OFFSET(pgno, pFD->szPage);
|
||||||
|
@ -257,43 +217,19 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgor
|
||||||
offset -= chunkoffset;
|
offset -= chunkoffset;
|
||||||
}
|
}
|
||||||
ASSERT(offset >= 0);
|
ASSERT(offset >= 0);
|
||||||
/*
|
|
||||||
if (pFD->s3File) {
|
|
||||||
LRUHandle *handle = NULL;
|
|
||||||
|
|
||||||
pFD->blkno = (pgno + tsS3BlockSize - 1) / tsS3BlockSize;
|
|
||||||
code = tsdbCacheGetBlockS3(pFD->pTsdb->bCache, pFD, &handle);
|
|
||||||
if (code != TSDB_CODE_SUCCESS || handle == NULL) {
|
|
||||||
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !handle) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
uint8_t *pBlock = (uint8_t *)taosLRUCacheValue(pFD->pTsdb->bCache, handle);
|
|
||||||
|
|
||||||
int64_t blk_offset = (pFD->blkno - 1) * tsS3BlockSize * pFD->szPage;
|
|
||||||
memcpy(pFD->pBuf, pBlock + (offset - blk_offset), pFD->szPage);
|
|
||||||
|
|
||||||
tsdbCacheRelease(pFD->pTsdb->bCache, handle);
|
|
||||||
} else {
|
|
||||||
*/
|
|
||||||
// seek
|
// seek
|
||||||
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
int64_t n = taosLSeekFile(pFD->pFD, offset, SEEK_SET);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// read
|
// read
|
||||||
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
n = taosReadFile(pFD->pFD, pFD->pBuf, pFD->szPage);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
} else if (n < pFD->szPage) {
|
} else if (n < pFD->szPage) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
//}
|
//}
|
||||||
|
|
||||||
|
@ -322,19 +258,22 @@ static int32_t tsdbReadFilePage(STsdbFD *pFD, int64_t pgno, int32_t encryptAlgor
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFD->pgno = pgno;
|
pFD->pgno = pgno;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
|
int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
|
||||||
char *encryptKey) {
|
char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||||
int64_t bOffset = fOffset % pFD->szPage;
|
int64_t bOffset = fOffset % pFD->szPage;
|
||||||
|
@ -343,11 +282,11 @@ int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t
|
||||||
do {
|
do {
|
||||||
if (pFD->pgno != pgno) {
|
if (pFD->pgno != pgno) {
|
||||||
code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
|
code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (pgno <= pFD->szFile) {
|
if (pgno <= pFD->szFile) {
|
||||||
code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
|
code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
pFD->pgno = pgno;
|
pFD->pgno = pgno;
|
||||||
}
|
}
|
||||||
|
@ -362,12 +301,16 @@ int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t
|
||||||
} while (n < size);
|
} while (n < size);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
|
static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int32_t encryptAlgorithm,
|
||||||
char *encryptKey) {
|
char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||||
|
@ -380,7 +323,7 @@ static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int6
|
||||||
while (n < size) {
|
while (n < size) {
|
||||||
if (pFD->pgno != pgno) {
|
if (pFD->pgno != pgno) {
|
||||||
code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
|
code = tsdbReadFilePage(pFD, pgno, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
int64_t nRead = TMIN(szPgCont - bOffset, size - n);
|
||||||
|
@ -392,15 +335,20 @@ static int32_t tsdbReadFileImp(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int6
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, bool check, uint8_t **ppBlock) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
|
SVnodeCfg *pCfg = &pFD->pTsdb->pVnode->config;
|
||||||
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
||||||
int64_t cOffset = offset % chunksize;
|
int64_t cOffset = offset % chunksize;
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
|
char *buf = NULL;
|
||||||
|
|
||||||
char *object_name = taosDirEntryBaseName(pFD->path);
|
char *object_name = taosDirEntryBaseName(pFD->path);
|
||||||
char object_name_prefix[TSDB_FILENAME_LEN];
|
char object_name_prefix[TSDB_FILENAME_LEN];
|
||||||
|
@ -410,11 +358,13 @@ static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, boo
|
||||||
char *dot = strrchr(object_name_prefix, '.');
|
char *dot = strrchr(object_name_prefix, '.');
|
||||||
if (!dot) {
|
if (!dot) {
|
||||||
tsdbError("unexpected path: %s", object_name_prefix);
|
tsdbError("unexpected path: %s", object_name_prefix);
|
||||||
code = TAOS_SYSTEM_ERROR(ENOENT);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(ENOENT), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char *buf = taosMemoryCalloc(1, size);
|
buf = taosMemoryCalloc(1, size);
|
||||||
|
if (buf == NULL) {
|
||||||
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t chunkno = offset / chunksize + 1; n < size; ++chunkno) {
|
for (int32_t chunkno = offset / chunksize + 1; n < size; ++chunkno) {
|
||||||
int64_t nRead = TMIN(chunksize - cOffset, size - n);
|
int64_t nRead = TMIN(chunksize - cOffset, size - n);
|
||||||
|
@ -423,20 +373,14 @@ static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, boo
|
||||||
// read last chunk
|
// read last chunk
|
||||||
int64_t ret = taosLSeekFile(pFD->pFD, chunksize * (chunkno - pFD->lcn) + cOffset, SEEK_SET);
|
int64_t ret = taosLSeekFile(pFD->pFD, chunksize * (chunkno - pFD->lcn) + cOffset, SEEK_SET);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
taosMemoryFree(buf);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ret = taosReadFile(pFD->pFD, buf + n, nRead);
|
ret = taosReadFile(pFD->pFD, buf + n, nRead);
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
taosMemoryFree(buf);
|
|
||||||
goto _exit;
|
|
||||||
} else if (ret < nRead) {
|
} else if (ret < nRead) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
taosMemoryFree(buf);
|
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uint8_t *pBlock = NULL;
|
uint8_t *pBlock = NULL;
|
||||||
|
@ -444,10 +388,7 @@ static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, boo
|
||||||
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
|
snprintf(dot + 1, TSDB_FQDN_LEN - (dot + 1 - object_name_prefix), "%d.data", chunkno);
|
||||||
|
|
||||||
code = s3GetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
|
code = s3GetObjectBlock(object_name_prefix, cOffset, nRead, check, &pBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
taosMemoryFree(buf);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
memcpy(buf + n, pBlock, nRead);
|
memcpy(buf + n, pBlock, nRead);
|
||||||
taosMemoryFree(pBlock);
|
taosMemoryFree(pBlock);
|
||||||
|
@ -457,14 +398,19 @@ static int32_t tsdbReadFileBlock(STsdbFD *pFD, int64_t offset, int64_t size, boo
|
||||||
cOffset = 0;
|
cOffset = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
*ppBlock = buf;
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
taosMemoryFree(buf);
|
||||||
|
} else {
|
||||||
|
*ppBlock = (uint8_t *)buf;
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
|
static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
int32_t szPgCont = PAGE_CONTENT_SIZE(pFD->szPage);
|
||||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||||
|
@ -486,7 +432,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
if (handle) {
|
if (handle) {
|
||||||
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
tsdbCacheRelease(pFD->pTsdb->pgCache, handle);
|
||||||
}
|
}
|
||||||
goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!handle) {
|
if (!handle) {
|
||||||
|
@ -499,8 +445,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFD->pgno = pgno;
|
pFD->pgno = pgno;
|
||||||
|
@ -525,16 +470,9 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
int64_t retrieve_size = (pgnoEnd - pgno + 1) * pFD->szPage;
|
||||||
/*
|
|
||||||
code = s3GetObjectBlock(pFD->objName, retrieve_offset, retrieve_size, 1, &pBlock);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, 1, &pBlock);
|
code = tsdbReadFileBlock(pFD, retrieve_offset, retrieve_size, 1, &pBlock);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
// 3, Store Pages in Cache
|
// 3, Store Pages in Cache
|
||||||
int nPage = pgnoEnd - pgno + 1;
|
int nPage = pgnoEnd - pgno + 1;
|
||||||
for (int i = 0; i < nPage; ++i) {
|
for (int i = 0; i < nPage; ++i) {
|
||||||
|
@ -550,8 +488,7 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
|
|
||||||
// check
|
// check
|
||||||
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
if (pgno > 1 && !taosCheckChecksumWhole(pFD->pBuf, pFD->szPage)) {
|
||||||
code = TSDB_CODE_FILE_CORRUPTED;
|
TSDB_CHECK_CODE(code = TSDB_CODE_FILE_CORRUPTED, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pFD->pgno = pgno;
|
pFD->pgno = pgno;
|
||||||
|
@ -568,59 +505,72 @@ static int32_t tsdbReadFileS3(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
|
int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint,
|
||||||
int32_t encryptAlgorithm, char *encryptKey) {
|
int32_t encryptAlgorithm, char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
if (!pFD->pFD) {
|
if (!pFD->pFD) {
|
||||||
code = tsdbOpenFileImpl(pFD);
|
code = tsdbOpenFileImpl(pFD);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pFD->s3File && pFD->lcn > 1 /* && tsS3BlockSize < 0*/) {
|
if (pFD->s3File && pFD->lcn > 1 /* && tsS3BlockSize < 0*/) {
|
||||||
return tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
|
code = tsdbReadFileS3(pFD, offset, pBuf, size, szHint);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
} else {
|
} else {
|
||||||
return tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
|
code = tsdbReadFileImp(pFD, offset, pBuf, size, encryptAlgorithm, encryptKey);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
|
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint,
|
||||||
int32_t encryptAlgorithm, char *encryptKey) {
|
int32_t encryptAlgorithm, char *encryptKey) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
code = tBufferEnsureCapacity(buffer, buffer->size + size);
|
code = tBufferEnsureCapacity(buffer, buffer->size + size);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint, encryptAlgorithm, encryptKey);
|
||||||
if (code) return code;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
buffer->size += size;
|
buffer->size += size;
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
|
int32_t tsdbFsyncFile(STsdbFD *pFD, int32_t encryptAlgorithm, char *encryptKey) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
/*
|
int32_t lino;
|
||||||
if (pFD->s3File) {
|
|
||||||
tsdbWarn("%s file: %s", __func__, pFD->path);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
|
code = tsdbWriteFilePage(pFD, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _exit;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (taosFsyncFile(pFD->pFD) < 0) {
|
if (taosFsyncFile(pFD->pFD) < 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
TSDB_CHECK_CODE(code = TAOS_SYSTEM_ERROR(errno), lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(pFD->pTsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -635,8 +585,7 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS
|
||||||
// alloc
|
// alloc
|
||||||
pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
pReader = (SDataFReader *)taosMemoryCalloc(1, sizeof(*pReader));
|
||||||
if (pReader == NULL) {
|
if (pReader == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
pReader->pTsdb = pTsdb;
|
pReader->pTsdb = pTsdb;
|
||||||
pReader->pSet = pSet;
|
pReader->pSet = pSet;
|
||||||
|
@ -711,22 +660,25 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) {
|
||||||
|
|
||||||
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
SHeadFile *pHeadFile = pReader->pSet->pHeadF;
|
SHeadFile *pHeadFile = pReader->pSet->pHeadF;
|
||||||
int64_t offset = pHeadFile->offset;
|
int64_t offset = pHeadFile->offset;
|
||||||
int64_t size = pHeadFile->size - offset;
|
int64_t size = pHeadFile->size - offset;
|
||||||
|
|
||||||
taosArrayClear(aBlockIdx);
|
taosArrayClear(aBlockIdx);
|
||||||
if (size == 0) return code;
|
if (size == 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], size);
|
code = tRealloc(&pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||||
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
|
@ -735,37 +687,39 @@ int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx) {
|
||||||
n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
|
n += tGetBlockIdx(pReader->aBuf[0] + n, &blockIdx);
|
||||||
|
|
||||||
if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
|
if (taosArrayPush(aBlockIdx, &blockIdx) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
|
||||||
tsdbError("vgId:%d, read block idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
|
SSttFile *pSttFile = pReader->pSet->aSttF[iStt];
|
||||||
int64_t offset = pSttFile->offset;
|
int64_t offset = pSttFile->offset;
|
||||||
int64_t size = pSttFile->size - offset;
|
int64_t size = pSttFile->size - offset;
|
||||||
|
|
||||||
taosArrayClear(aSttBlk);
|
taosArrayClear(aSttBlk);
|
||||||
if (size == 0) return code;
|
if (size == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], size);
|
code = tRealloc(&pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||||
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||||
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
int64_t n = 0;
|
int64_t n = 0;
|
||||||
|
@ -774,46 +728,45 @@ int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) {
|
||||||
n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
|
n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk);
|
||||||
|
|
||||||
if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
|
if (taosArrayPush(aSttBlk, &sttBlk) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
|
||||||
tsdbError("vgId:%d, read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
|
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int64_t offset = pBlockIdx->offset;
|
int64_t offset = pBlockIdx->offset;
|
||||||
int64_t size = pBlockIdx->size;
|
int64_t size = pBlockIdx->size;
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], size);
|
code = tRealloc(&pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||||
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||||
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pReader->pHeadFD, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
|
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
|
||||||
if (n < 0) {
|
if (n < 0) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
|
||||||
tsdbError("vgId:%d, read block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -834,8 +787,7 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
||||||
// alloc
|
// alloc
|
||||||
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
|
pDelFReader = (SDelFReader *)taosMemoryCalloc(1, sizeof(*pDelFReader));
|
||||||
if (pDelFReader == NULL) {
|
if (pDelFReader == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// open impl
|
// open impl
|
||||||
|
@ -844,15 +796,13 @@ int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb
|
||||||
|
|
||||||
tsdbDelFileName(pTsdb, pFile, fname);
|
tsdbDelFileName(pTsdb, pFile, fname);
|
||||||
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
|
code = tsdbOpenFile(fname, pTsdb, TD_FILE_READ, &pDelFReader->pReadH, 0);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
taosMemoryFree(pDelFReader);
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
*ppReader = NULL;
|
*ppReader = NULL;
|
||||||
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
TSDB_ERROR_LOG(TD_VID(pTsdb->pVnode), lino, code);
|
||||||
|
taosMemoryFree(pDelFReader);
|
||||||
} else {
|
} else {
|
||||||
*ppReader = pDelFReader;
|
*ppReader = pDelFReader;
|
||||||
}
|
}
|
||||||
|
@ -881,6 +831,7 @@ int32_t tsdbReadDelData(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData
|
||||||
|
|
||||||
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
|
int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelData, int64_t maxVer) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int64_t offset = pDelIdx->offset;
|
int64_t offset = pDelIdx->offset;
|
||||||
int64_t size = pDelIdx->size;
|
int64_t size = pDelIdx->size;
|
||||||
int64_t n;
|
int64_t n;
|
||||||
|
@ -889,13 +840,13 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], size);
|
code = tRealloc(&pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||||
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// // decode
|
// // decode
|
||||||
n = 0;
|
n = 0;
|
||||||
|
@ -907,22 +858,22 @@ int32_t tsdbReadDelDatav1(SDelFReader *pReader, SDelIdx *pDelIdx, SArray *aDelDa
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (taosArrayPush(aDelData, &delData) == NULL) {
|
if (taosArrayPush(aDelData, &delData) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
|
||||||
tsdbError("vgId:%d, read del data failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
int32_t n;
|
int32_t n;
|
||||||
int64_t offset = pReader->fDel.offset;
|
int64_t offset = pReader->fDel.offset;
|
||||||
int64_t size = pReader->fDel.size - offset;
|
int64_t size = pReader->fDel.size - offset;
|
||||||
|
@ -931,13 +882,13 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
||||||
|
|
||||||
// alloc
|
// alloc
|
||||||
code = tRealloc(&pReader->aBuf[0], size);
|
code = tRealloc(&pReader->aBuf[0], size);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// read
|
// read
|
||||||
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
int32_t encryptAlgorithm = pReader->pTsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||||
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
char *encryptKey = pReader->pTsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||||
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
code = tsdbReadFile(pReader->pReadH, offset, pReader->aBuf[0], size, 0, encryptAlgorithm, encryptKey);
|
||||||
if (code) goto _err;
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
n = 0;
|
n = 0;
|
||||||
|
@ -947,16 +898,15 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx) {
|
||||||
n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
|
n += tGetDelIdx(pReader->aBuf[0] + n, &delIdx);
|
||||||
|
|
||||||
if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
|
if (taosArrayPush(aDelIdx, &delIdx) == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
TSDB_CHECK_CODE(code = TSDB_CODE_OUT_OF_MEMORY, lino, _exit);
|
||||||
goto _err;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(n == size);
|
ASSERT(n == size);
|
||||||
|
|
||||||
return code;
|
_exit:
|
||||||
|
if (code) {
|
||||||
_err:
|
TSDB_ERROR_LOG(TD_VID(pReader->pTsdb->pVnode), lino, code);
|
||||||
tsdbError("vgId:%d, read del idx failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue