add a idx file for query speed
This commit is contained in:
parent
58d06bdbf9
commit
aae9b3c1bc
|
@ -132,21 +132,23 @@ typedef struct {
|
||||||
// ------------------ tsdbFile.c
|
// ------------------ tsdbFile.c
|
||||||
extern const char* tsdbFileSuffix[];
|
extern const char* tsdbFileSuffix[];
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FILE_TYPE_HEAD = 0,
|
TSDB_FILE_TYPE_IDX = 0,
|
||||||
|
TSDB_FILE_TYPE_HEAD,
|
||||||
TSDB_FILE_TYPE_DATA,
|
TSDB_FILE_TYPE_DATA,
|
||||||
TSDB_FILE_TYPE_LAST,
|
TSDB_FILE_TYPE_LAST,
|
||||||
TSDB_FILE_TYPE_MAX,
|
TSDB_FILE_TYPE_MAX,
|
||||||
|
TSDB_FILE_TYPE_NIDX,
|
||||||
TSDB_FILE_TYPE_NHEAD,
|
TSDB_FILE_TYPE_NHEAD,
|
||||||
TSDB_FILE_TYPE_NLAST
|
TSDB_FILE_TYPE_NLAST
|
||||||
} TSDB_FILE_TYPE;
|
} TSDB_FILE_TYPE;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t offset;
|
uint32_t magic;
|
||||||
uint32_t len;
|
uint32_t len;
|
||||||
uint64_t size; // total size of the file
|
|
||||||
uint64_t tombSize; // unused file size
|
|
||||||
uint32_t totalBlocks;
|
uint32_t totalBlocks;
|
||||||
uint32_t totalSubBlocks;
|
uint32_t totalSubBlocks;
|
||||||
|
uint64_t size; // total size of the file
|
||||||
|
uint64_t tombSize; // unused file size
|
||||||
} STsdbFileInfo;
|
} STsdbFileInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -249,16 +251,12 @@ typedef struct {
|
||||||
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
|
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int fid;
|
TSKEY minKey;
|
||||||
TSKEY minKey;
|
TSKEY maxKey;
|
||||||
TSKEY maxKey;
|
SFileGroup fGroup;
|
||||||
// For read/write purpose
|
SFile nIdxF;
|
||||||
SFile headF;
|
SFile nHeadF;
|
||||||
SFile dataF;
|
SFile nLastF;
|
||||||
SFile lastF;
|
|
||||||
// For write purpose only
|
|
||||||
SFile nHeadF;
|
|
||||||
SFile nLastF;
|
|
||||||
} SHelperFile;
|
} SHelperFile;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -444,6 +442,14 @@ void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
||||||
#define helperRepo(h) (h)->pRepo
|
#define helperRepo(h) (h)->pRepo
|
||||||
#define helperState(h) (h)->state
|
#define helperState(h) (h)->state
|
||||||
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
#define TSDB_NLAST_FILE_OPENED(h) ((h)->files.nLastF.fd > 0)
|
||||||
|
#define helperFileId(h) ((h)->files.fGroup.fileId)
|
||||||
|
#define helperIdxF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_IDX]))
|
||||||
|
#define helperHeadF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_HEAD]))
|
||||||
|
#define helperDataF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_DATA]))
|
||||||
|
#define helperLastF(h) (&((h)->files.fGroup.files[TSDB_FILE_TYPE_LAST]))
|
||||||
|
#define helperNewIdxF(h) (&((h)->files.nIdxF))
|
||||||
|
#define helperNewHeadF(h) (&((h)->files.nHeadF))
|
||||||
|
#define helperNewLastF(h) (&((h)->files.nLastF))
|
||||||
|
|
||||||
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
int tsdbInitReadHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||||
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
int tsdbInitWriteHelper(SRWHelper* pHelper, STsdbRepo* pRepo);
|
||||||
|
|
|
@ -30,7 +30,7 @@
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "tfile.h"
|
#include "tfile.h"
|
||||||
|
|
||||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
|
const char *tsdbFileSuffix[] = {".idx", ".head", ".data", ".last", "", ".i", ".h", ".l"};
|
||||||
|
|
||||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||||
static void tsdbDestroyFile(SFile *pFile);
|
static void tsdbDestroyFile(SFile *pFile);
|
||||||
|
@ -108,7 +108,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
memset((void *)(&fileGroup), 0, sizeof(SFileGroup));
|
||||||
fileGroup.fileId = fid;
|
fileGroup.fileId = fid;
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
|
if (tsdbInitFile(&fileGroup.files[type], pRepo, fid, type) < 0) {
|
||||||
tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type);
|
tsdbError("vgId:%d failed to init file fid %d type %d", REPO_ID(pRepo), fid, type);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -126,7 +126,7 @@ int tsdbOpenFileH(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&fileGroup.files[type]);
|
||||||
|
|
||||||
tfree(tDataDir);
|
tfree(tDataDir);
|
||||||
if (dir != NULL) closedir(dir);
|
if (dir != NULL) closedir(dir);
|
||||||
|
@ -139,7 +139,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
for (int i = 0; i < pFileH->nFGroups; i++) {
|
for (int i = 0; i < pFileH->nFGroups; i++) {
|
||||||
SFileGroup *pFGroup = pFileH->pFGroup + i;
|
SFileGroup *pFGroup = pFileH->pFGroup + i;
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
tsdbDestroyFile(&pFGroup->files[type]);
|
tsdbDestroyFile(&pFGroup->files[type]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,7 +156,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
|
||||||
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
if (pGroup == NULL) { // if not exists, create one
|
if (pGroup == NULL) { // if not exists, create one
|
||||||
pFGroup->fileId = fid;
|
pFGroup->fileId = fid;
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0)
|
if (tsdbCreateFile(&pFGroup->files[type], pRepo, fid, type) < 0)
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -169,7 +169,7 @@ SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int
|
||||||
return pGroup;
|
return pGroup;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]);
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) tsdbDestroyFile(&pGroup->files[type]);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -323,7 +323,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) {
|
||||||
|
|
||||||
int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
|
int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
|
||||||
int tlen = 0;
|
int tlen = 0;
|
||||||
tlen += taosEncodeFixedU32(buf, pInfo->offset);
|
tlen += taosEncodeFixedU32(buf, pInfo->magic);
|
||||||
tlen += taosEncodeFixedU32(buf, pInfo->len);
|
tlen += taosEncodeFixedU32(buf, pInfo->len);
|
||||||
tlen += taosEncodeFixedU64(buf, pInfo->size);
|
tlen += taosEncodeFixedU64(buf, pInfo->size);
|
||||||
tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
|
tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
|
||||||
|
@ -334,7 +334,7 @@ int tsdbEncodeSFileInfo(void **buf, const STsdbFileInfo *pInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
|
void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) {
|
||||||
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
|
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
|
||||||
buf = taosDecodeFixedU32(buf, &(pInfo->len));
|
buf = taosDecodeFixedU32(buf, &(pInfo->len));
|
||||||
buf = taosDecodeFixedU64(buf, &(pInfo->size));
|
buf = taosDecodeFixedU64(buf, &(pInfo->size));
|
||||||
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
|
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
|
||||||
|
@ -358,7 +358,7 @@ void tsdbRemoveFileGroup(STsdbRepo *pRepo, SFileGroup *pFGroup) {
|
||||||
pFileH->nFGroups--;
|
pFileH->nFGroups--;
|
||||||
ASSERT(pFileH->nFGroups >= 0);
|
ASSERT(pFileH->nFGroups >= 0);
|
||||||
|
|
||||||
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = TSDB_FILE_TYPE_IDX; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
if (remove(fileGroup.files[type].fname) < 0) {
|
if (remove(fileGroup.files[type].fname) < 0) {
|
||||||
tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname);
|
tsdbError("vgId:%d failed to remove file %s", REPO_ID(pRepo), fileGroup.files[type].fname);
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,7 +214,7 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
|
||||||
char *prefix = dirname(sdup);
|
char *prefix = dirname(sdup);
|
||||||
|
|
||||||
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
|
if (name[0] == 0) { // get the file from index or after, but not larger than eindex
|
||||||
int fid = (*index) / 3;
|
int fid = (*index) / TSDB_FILE_TYPE_MAX;
|
||||||
|
|
||||||
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
|
if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) {
|
||||||
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
|
if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) {
|
||||||
|
@ -228,11 +228,11 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
|
||||||
SFileGroup *pFGroup =
|
SFileGroup *pFGroup =
|
||||||
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
|
taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE);
|
||||||
if (pFGroup->fileId == fid) {
|
if (pFGroup->fileId == fid) {
|
||||||
fname = strdup(pFGroup->files[(*index) % 3].fname);
|
fname = strdup(pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX].fname);
|
||||||
} else {
|
} else {
|
||||||
if (pFGroup->fileId * 3 + 2 < eindex) {
|
if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < eindex) {
|
||||||
fname = strdup(pFGroup->files[0].fname);
|
fname = strdup(pFGroup->files[0].fname);
|
||||||
*index = pFGroup->fileId * 3;
|
*index = pFGroup->fileId * TSDB_FILE_TYPE_MAX;
|
||||||
} else {
|
} else {
|
||||||
tfree(sdup);
|
tfree(sdup);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -244,14 +244,14 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
|
||||||
if (*index == TSDB_META_FILE_INDEX) { // get meta file
|
if (*index == TSDB_META_FILE_INDEX) { // get meta file
|
||||||
fname = tsdbGetMetaFileName(pRepo->rootDir);
|
fname = tsdbGetMetaFileName(pRepo->rootDir);
|
||||||
} else {
|
} else {
|
||||||
int fid = (*index) / 3;
|
int fid = (*index) / TSDB_FILE_TYPE_MAX;
|
||||||
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
SFileGroup *pFGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ);
|
||||||
if (pFGroup == NULL) { // not found
|
if (pFGroup == NULL) { // not found
|
||||||
tfree(sdup);
|
tfree(sdup);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SFile *pFile = &pFGroup->files[(*index) % 3];
|
SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX];
|
||||||
fname = strdup(pFile->fname);
|
fname = strdup(pFile->fname);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -628,9 +628,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
tsdbCloseHelperFile(pHelper, 0);
|
tsdbCloseHelperFile(pHelper, 0);
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
|
pGroup->files[TSDB_FILE_TYPE_IDX] = *(helperIdxF(pHelper));
|
||||||
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
|
pGroup->files[TSDB_FILE_TYPE_HEAD] = *(helperHeadF(pHelper));
|
||||||
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
|
pGroup->files[TSDB_FILE_TYPE_DATA] = *(helperDataF(pHelper));
|
||||||
|
pGroup->files[TSDB_FILE_TYPE_LAST] = *(helperLastF(pHelper));
|
||||||
pthread_rwlock_unlock(&(pFileH->fhlock));
|
pthread_rwlock_unlock(&(pFileH->fhlock));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -106,44 +106,36 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
|
ASSERT(pHelper->state == TSDB_HELPER_CLEAR_STATE);
|
||||||
|
|
||||||
// Set the files
|
// Set the files
|
||||||
pHelper->files.fid = pGroup->fileId;
|
pHelper->files.fGroup = *pGroup;
|
||||||
pHelper->files.headF = pGroup->files[TSDB_FILE_TYPE_HEAD];
|
|
||||||
pHelper->files.dataF = pGroup->files[TSDB_FILE_TYPE_DATA];
|
|
||||||
pHelper->files.lastF = pGroup->files[TSDB_FILE_TYPE_LAST];
|
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, pHelper->files.nHeadF.fname);
|
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NIDX, helperNewIdxF(pHelper)->fname);
|
||||||
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, pHelper->files.nLastF.fname);
|
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NHEAD, helperNewHeadF(pHelper)->fname);
|
||||||
|
tsdbGetDataFileName(pHelper->pRepo, pGroup->fileId, TSDB_FILE_TYPE_NLAST, helperNewLastF(pHelper)->fname);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Open the files
|
// Open the files
|
||||||
if (tsdbOpenFile(&(pHelper->files.headF), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperIdxF(pHelper), O_RDONLY) < 0) goto _err;
|
||||||
|
if (tsdbOpenFile(helperHeadF(pHelper), O_RDONLY) < 0) goto _err;
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDWR) < 0) goto _err;
|
if (tsdbOpenFile(helperDataF(pHelper), O_RDWR) < 0) goto _err;
|
||||||
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDWR) < 0) goto _err;
|
if (tsdbOpenFile(helperLastF(pHelper), O_RDWR) < 0) goto _err;
|
||||||
|
|
||||||
|
// Create and open .i file
|
||||||
|
if (tsdbOpenFile(helperNewIdxF(pHelper), O_WRONLY | O_CREAT) < 0) return -1;
|
||||||
|
if (tsdbUpdateFileHeader(helperNewIdxF(pHelper), 0) < 0) return -1;
|
||||||
|
|
||||||
// Create and open .h
|
// Create and open .h
|
||||||
if (tsdbOpenFile(&(pHelper->files.nHeadF), O_WRONLY | O_CREAT) < 0) return -1;
|
if (tsdbOpenFile(helperNewHeadF(pHelper), O_WRONLY | O_CREAT) < 0) return -1;
|
||||||
// size_t tsize = TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables + sizeof(TSCKSUM);
|
if (tsdbUpdateFileHeader(helperNewHeadF(pHelper), 0) < 0) return -1;
|
||||||
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
|
||||||
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
|
||||||
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create and open .l file if should
|
// Create and open .l file if should
|
||||||
if (tsdbShouldCreateNewLast(pHelper)) {
|
if (tsdbShouldCreateNewLast(pHelper)) {
|
||||||
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
|
if (tsdbOpenFile(helperNewLastF(pHelper), O_WRONLY | O_CREAT) < 0) goto _err;
|
||||||
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
if (tsdbUpdateFileHeader(helperNewLastF(pHelper), 0) < 0) return -1;
|
||||||
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
|
||||||
TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperDataF(pHelper), O_RDONLY) < 0) goto _err;
|
||||||
if (tsdbOpenFile(&(pHelper->files.lastF), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(helperLastF(pHelper), O_RDONLY) < 0) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
|
helperSetState(pHelper, TSDB_HELPER_FILE_SET_AND_OPEN);
|
||||||
|
@ -155,59 +147,94 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) {
|
||||||
if (pHelper->files.headF.fd > 0) {
|
SFile *pFile = NULL;
|
||||||
close(pHelper->files.headF.fd);
|
|
||||||
pHelper->files.headF.fd = -1;
|
pFile = helperIdxF(pHelper);
|
||||||
|
if (pFile->fd > 0) {
|
||||||
|
close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
}
|
}
|
||||||
if (pHelper->files.dataF.fd > 0) {
|
|
||||||
|
pFile = helperHeadF(pHelper);
|
||||||
|
if (pFile->fd > 0) {
|
||||||
|
close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFile = helperDataF(pHelper);
|
||||||
|
if (pFile->fd > 0) {
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
tsdbUpdateFileHeader(&(pHelper->files.dataF), 0);
|
tsdbUpdateFileHeader(pFile, 0);
|
||||||
fsync(pHelper->files.dataF.fd);
|
fsync(pFile->fd);
|
||||||
}
|
}
|
||||||
close(pHelper->files.dataF.fd);
|
close(pFile->fd);
|
||||||
pHelper->files.dataF.fd = -1;
|
pFile->fd = -1;
|
||||||
}
|
}
|
||||||
if (pHelper->files.lastF.fd > 0) {
|
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
pFile = helperLastF(pHelper);
|
||||||
fsync(pHelper->files.lastF.fd);
|
if (pFile->fd > 0) {
|
||||||
|
if (helperType(pHelper) == TSDB_WRITE_HELPER && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||||
|
fsync(pFile->fd);
|
||||||
}
|
}
|
||||||
close(pHelper->files.lastF.fd);
|
close(pFile->fd);
|
||||||
pHelper->files.lastF.fd = -1;
|
pFile->fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
if (helperType(pHelper) == TSDB_WRITE_HELPER) {
|
||||||
if (pHelper->files.nHeadF.fd > 0) {
|
pFile = helperNewIdxF(pHelper);
|
||||||
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0);
|
if (pFile->fd > 0) {
|
||||||
fsync(pHelper->files.nHeadF.fd);
|
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
||||||
close(pHelper->files.nHeadF.fd);
|
fsync(pFile->fd);
|
||||||
pHelper->files.nHeadF.fd = -1;
|
close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
if (hasError) {
|
if (hasError) {
|
||||||
(void)remove(pHelper->files.nHeadF.fname);
|
(void)remove(pFile->fname);
|
||||||
} else {
|
} else {
|
||||||
if (rename(pHelper->files.nHeadF.fname, pHelper->files.headF.fname) < 0) {
|
if (rename(pFile->fname, helperIdxF(pHelper)->fname) < 0) {
|
||||||
tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nHeadF.fname,
|
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperIdxF(pHelper)->fname,
|
||||||
pHelper->files.headF.fname, strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pHelper->files.headF.info = pHelper->files.nHeadF.info;
|
helperIdxF(pHelper)->info = pFile->info;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pHelper->files.nLastF.fd > 0) {
|
pFile = helperNewHeadF(pHelper);
|
||||||
if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0);
|
if (pFile->fd > 0) {
|
||||||
fsync(pHelper->files.nLastF.fd);
|
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
||||||
close(pHelper->files.nLastF.fd);
|
fsync(pFile->fd);
|
||||||
pHelper->files.nLastF.fd = -1;
|
close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
if (hasError) {
|
if (hasError) {
|
||||||
(void)remove(pHelper->files.nLastF.fname);
|
(void)remove(pFile->fname);
|
||||||
} else {
|
} else {
|
||||||
if (rename(pHelper->files.nLastF.fname, pHelper->files.lastF.fname) < 0) {
|
if (rename(pFile->fname, helperHeadF(pHelper)->fname) < 0) {
|
||||||
tsdbError("failed to rename file from %s to %s since %s", pHelper->files.nLastF.fname,
|
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperHeadF(pHelper)->fname,
|
||||||
pHelper->files.lastF.fname, strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pHelper->files.lastF.info = pHelper->files.nLastF.info;
|
helperHeadF(pHelper)->info = pFile->info;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pFile = helperNewLastF(pHelper);
|
||||||
|
if (pFile->fd > 0) {
|
||||||
|
if (!hasError) tsdbUpdateFileHeader(pFile, 0);
|
||||||
|
fsync(pFile->fd);
|
||||||
|
close(pFile->fd);
|
||||||
|
pFile->fd = -1;
|
||||||
|
if (hasError) {
|
||||||
|
(void)remove(pFile->fname);
|
||||||
|
} else {
|
||||||
|
if (rename(pFile->fname, helperLastF(pHelper)->fname) < 0) {
|
||||||
|
tsdbError("failed to rename file from %s to %s since %s", pFile->fname, helperLastF(pHelper)->fname,
|
||||||
|
strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
helperLastF(pHelper)->info = helperNewLastF(pHelper)->info;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -283,28 +310,28 @@ int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper) {
|
||||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
|
||||||
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
pHelper->pDataCols[0]->numOfRows < pCfg->minRowsPerFileBlock);
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.nLastF), pHelper->pDataCols[0], &compBlock, true, true) < 0)
|
if (tsdbWriteBlockToFile(pHelper, helperNewLastF(pHelper), pHelper->pDataCols[0], &compBlock, true, true) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1) < 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
if (lseek(pHelper->files.lastF.fd, pCompBlock->offset, SEEK_SET) < 0) {
|
if (lseek(helperLastF(pHelper)->fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.lastF.fname,
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperLastF(pHelper)->fname,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pCompBlock->offset = lseek(pHelper->files.nLastF.fd, 0, SEEK_END);
|
pCompBlock->offset = lseek(helperNewLastF(pHelper)->fd, 0, SEEK_END);
|
||||||
if (pCompBlock->offset < 0) {
|
if (pCompBlock->offset < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nLastF.fname,
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewLastF(pHelper)->fname,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, pCompBlock->len) < pCompBlock->len) {
|
if (tsendfile(helperNewLastF(pHelper)->fd, helperLastF(pHelper)->fd, NULL, pCompBlock->len) < pCompBlock->len) {
|
||||||
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
|
tsdbError("vgId:%d failed to sendfile from file %s to file %s since %s", REPO_ID(pHelper->pRepo),
|
||||||
pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
|
helperLastF(pHelper)->fname, helperNewLastF(pHelper)->fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -321,9 +348,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||||
if (pIdx->offset > 0) {
|
if (pIdx->offset > 0) {
|
||||||
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END);
|
||||||
if (offset < 0) {
|
if (offset < 0) {
|
||||||
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
tsdbError("vgId:%d failed to lseed file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -332,9 +359,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
pIdx->offset = offset;
|
pIdx->offset = offset;
|
||||||
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
||||||
|
|
||||||
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, pIdx->len) < pIdx->len) {
|
if (tsendfile(helperNewHeadF(pHelper)->fd, helperHeadF(pHelper)->fd, NULL, pIdx->len) < pIdx->len) {
|
||||||
tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
tsdbError("vgId:%d failed to send %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||||
pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
|
helperHeadF(pHelper)->fname, helperNewHeadF(pHelper)->fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -347,9 +374,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) &&
|
ASSERT(pIdx->len > sizeof(SCompInfo) + sizeof(TSCKSUM) &&
|
||||||
(pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
|
(pIdx->len - sizeof(SCompInfo) - sizeof(TSCKSUM)) % sizeof(SCompBlock) == 0);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
|
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pCompInfo, pIdx->len);
|
||||||
offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
offset = lseek(helperNewHeadF(pHelper)->fd, 0, SEEK_END);
|
||||||
if (offset < 0) {
|
if (offset < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperNewHeadF(pHelper)->fname,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -358,9 +385,9 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
pIdx->uid = pHelper->tableInfo.uid;
|
pIdx->uid = pHelper->tableInfo.uid;
|
||||||
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
ASSERT(pIdx->offset >= TSDB_FILE_HEAD_SIZE);
|
||||||
|
|
||||||
if (twrite(pHelper->files.nHeadF.fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
if (twrite(helperNewHeadF(pHelper)->fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
||||||
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||||
pHelper->files.nHeadF.fname, strerror(errno));
|
helperNewHeadF(pHelper)->fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -371,19 +398,10 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
|
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
||||||
STsdbCfg *pCfg = &pHelper->pRepo->config;
|
STsdbCfg *pCfg = &pHelper->pRepo->config;
|
||||||
|
|
||||||
ASSERT(helperType(pHelper) == TSDB_WRITE_HELPER);
|
SFile *pFile = helperNewIdxF(pHelper);
|
||||||
off_t offset = lseek(pHelper->files.nHeadF.fd, 0, SEEK_END);
|
|
||||||
if (offset < 0) {
|
|
||||||
tsdbError("vgId:%d failed to lseek file %s to end since %s", REPO_ID(pHelper->pRepo), pHelper->files.nHeadF.fname,
|
|
||||||
strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SFile *pFile = &(pHelper->files.nHeadF);
|
|
||||||
pFile->info.offset = offset;
|
|
||||||
|
|
||||||
void *buf = pHelper->pBuffer;
|
void *buf = pHelper->pBuffer;
|
||||||
for (uint32_t i = 0; i < pCfg->maxTables; i++) {
|
for (uint32_t i = 0; i < pCfg->maxTables; i++) {
|
||||||
|
@ -406,9 +424,9 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
|
int tsize = (char *)buf - (char *)pHelper->pBuffer + sizeof(TSCKSUM);
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
|
taosCalcChecksumAppend(0, (uint8_t *)pHelper->pBuffer, tsize);
|
||||||
|
|
||||||
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pBuffer, tsize) < tsize) {
|
if (twrite(pFile->fd, (void *)pHelper->pBuffer, tsize) < tsize) {
|
||||||
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize,
|
tsdbError("vgId:%d failed to write %d bytes to file %s since %s", REPO_ID(pHelper->pRepo), tsize, pFile->fname,
|
||||||
pHelper->files.nHeadF.fname, strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -423,23 +441,21 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
|
||||||
|
|
||||||
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
|
if (!helperHasState(pHelper, TSDB_HELPER_IDX_LOAD)) {
|
||||||
// If not load from file, just load it in object
|
// If not load from file, just load it in object
|
||||||
SFile *pFile = &(pHelper->files.headF);
|
SFile *pFile = helperIdxF(pHelper);
|
||||||
int fd = pFile->fd;
|
int fd = pFile->fd;
|
||||||
|
|
||||||
memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
|
memset(pHelper->pCompIdx, 0, tsizeof(pHelper->pCompIdx));
|
||||||
if (pFile->info.offset > 0) {
|
if (pFile->info.len > 0) {
|
||||||
ASSERT(pFile->info.offset > TSDB_FILE_HEAD_SIZE);
|
|
||||||
|
|
||||||
if (lseek(fd, pFile->info.offset, SEEK_SET) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to lseek file %s to %u since %s", REPO_ID(pHelper->pRepo), pFile->fname,
|
|
||||||
pFile->info.offset, strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
|
if ((pHelper->pBuffer = trealloc(pHelper->pBuffer, pFile->info.len)) == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
|
if (tread(fd, (void *)(pHelper->pBuffer), pFile->info.len) < pFile->info.len) {
|
||||||
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
|
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pFile->info.len,
|
||||||
pFile->fname, strerror(errno));
|
pFile->fname, strerror(errno));
|
||||||
|
@ -447,8 +463,8 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
|
if (!taosCheckChecksumWhole((uint8_t *)(pHelper->pBuffer), pFile->info.len)) {
|
||||||
tsdbError("vgId:%d file %s SCompIdx part is corrupted. offset %u len %u", REPO_ID(pHelper->pRepo), pFile->fname,
|
tsdbError("vgId:%d file %s SCompIdx part is corrupted. len %u", REPO_ID(pHelper->pRepo), pFile->fname,
|
||||||
pFile->info.offset, pFile->info.len);
|
pFile->info.len);
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -484,13 +500,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
|
|
||||||
int fd = pHelper->files.headF.fd;
|
int fd = helperHeadF(pHelper)->fd;
|
||||||
|
|
||||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||||
if (pIdx->offset > 0) {
|
if (pIdx->offset > 0) {
|
||||||
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
|
ASSERT(pIdx->uid == pHelper->tableInfo.uid);
|
||||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
|
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pHelper->files.headF.fname,
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), helperHeadF(pHelper)->fname,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -499,13 +515,13 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
||||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) {
|
||||||
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
tsdbError("vgId:%d failed to read %d bytes from file %s since %s", REPO_ID(pHelper->pRepo), pIdx->len,
|
||||||
pHelper->files.headF.fname, strerror(errno));
|
helperHeadF(pHelper)->fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
|
if (!taosCheckChecksumWhole((uint8_t *)pHelper->pCompInfo, pIdx->len)) {
|
||||||
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
|
tsdbError("vgId:%d file %s SCompInfo part is corrupted, tid %d uid %" PRIu64, REPO_ID(pHelper->pRepo),
|
||||||
pHelper->files.headF.fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
|
helperHeadF(pHelper)->fname, pHelper->tableInfo.tid, pHelper->tableInfo.uid);
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -523,7 +539,7 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
|
|
||||||
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
|
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
|
||||||
|
|
||||||
if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) {
|
if (lseek(pFile->fd, pCompBlock->offset, SEEK_SET) < 0) {
|
||||||
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
tsdbError("vgId:%d failed to lseek file %s since %s", REPO_ID(pHelper->pRepo), pFile->fname, strerror(errno));
|
||||||
|
@ -642,9 +658,9 @@ _err:
|
||||||
|
|
||||||
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
// ---------------------- INTERNAL FUNCTIONS ----------------------
|
||||||
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
static bool tsdbShouldCreateNewLast(SRWHelper *pHelper) {
|
||||||
ASSERT(pHelper->files.lastF.fd > 0);
|
ASSERT(helperLastF(pHelper)->fd > 0);
|
||||||
struct stat st;
|
struct stat st;
|
||||||
if (fstat(pHelper->files.lastF.fd, &st) < 0) return true;
|
if (fstat(helperLastF(pHelper)->fd, &st) < 0) return true;
|
||||||
if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
|
if (st.st_size > 32 * 1024 + TSDB_FILE_HEAD_SIZE) return true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -972,12 +988,13 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
|
|
||||||
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
||||||
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
memset((void *)&pHelper->files, 0, sizeof(pHelper->files));
|
||||||
pHelper->files.fid = -1;
|
helperIdxF(pHelper)->fd = -1;
|
||||||
pHelper->files.headF.fd = -1;
|
helperHeadF(pHelper)->fd = -1;
|
||||||
pHelper->files.dataF.fd = -1;
|
helperDataF(pHelper)->fd = -1;
|
||||||
pHelper->files.lastF.fd = -1;
|
helperLastF(pHelper)->fd = -1;
|
||||||
pHelper->files.nHeadF.fd = -1;
|
helperNewIdxF(pHelper)->fd = -1;
|
||||||
pHelper->files.nLastF.fd = -1;
|
helperNewHeadF(pHelper)->fd = -1;
|
||||||
|
helperNewLastF(pHelper)->fd = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
||||||
|
@ -1154,7 +1171,7 @@ static int tsdbLoadBlockDataColsImpl(SRWHelper *pHelper, SCompBlock *pCompBlock,
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
ASSERT(colIds[0] == 0);
|
ASSERT(colIds[0] == 0);
|
||||||
|
|
||||||
SFile * pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
SFile * pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
|
||||||
SCompCol compCol = {0};
|
SCompCol compCol = {0};
|
||||||
|
|
||||||
// If only load timestamp column, no need to load SCompData part
|
// If only load timestamp column, no need to load SCompData part
|
||||||
|
@ -1215,7 +1232,7 @@ _err:
|
||||||
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *pDataCols) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
ASSERT(pCompBlock->numOfSubBlocks <= 1);
|
||||||
|
|
||||||
SFile *pFile = (pCompBlock->last) ? &(pHelper->files.lastF) : &(pHelper->files.dataF);
|
SFile *pFile = (pCompBlock->last) ? helperLastF(pHelper) : helperDataF(pHelper);
|
||||||
|
|
||||||
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len);
|
pHelper->pBuffer = trealloc(pHelper->pBuffer, pCompBlock->len);
|
||||||
if (pHelper->pBuffer == NULL) {
|
if (pHelper->pBuffer == NULL) {
|
||||||
|
@ -1362,7 +1379,7 @@ static int tsdbProcessAppendCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
||||||
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
|
ASSERT(rowsRead > 0 && rowsRead == pDataCols->numOfRows);
|
||||||
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
|
if (rowsRead + pCompBlock->numOfRows < pCfg->minRowsPerFileBlock &&
|
||||||
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
pCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && !TSDB_NLAST_FILE_OPENED(pHelper)) {
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
|
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
|
||||||
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
|
if (tsdbAddSubBlock(pHelper, &compBlock, pIdx->numOfBlocks - 1, rowsRead) < 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
if (tsdbLoadBlockData(pHelper, pCompBlock, NULL) < 0) return -1;
|
||||||
|
@ -1427,7 +1444,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
||||||
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, maxKey, rows1, pDataCols,
|
||||||
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
||||||
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
|
ASSERT(rowsRead == rows2 && rowsRead == pDataCols->numOfRows);
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.lastF), pDataCols, &compBlock, true, false) < 0) return -1;
|
if (tsdbWriteBlockToFile(pHelper, helperLastF(pHelper), pDataCols, &compBlock, true, false) < 0) return -1;
|
||||||
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
||||||
tblkIdx++;
|
tblkIdx++;
|
||||||
} else {
|
} else {
|
||||||
|
@ -1466,7 +1483,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
||||||
if (rowsRead == 0) break;
|
if (rowsRead == 0) break;
|
||||||
|
|
||||||
ASSERT(rowsRead == pDataCols->numOfRows);
|
ASSERT(rowsRead == pDataCols->numOfRows);
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0) return -1;
|
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0) return -1;
|
||||||
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
if (tsdbInsertSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
tblkIdx++;
|
tblkIdx++;
|
||||||
}
|
}
|
||||||
|
@ -1493,7 +1510,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
||||||
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
|
int rowsRead = tsdbLoadDataFromCache(pTable, pCommitIter->pIter, keyLimit, rows, pDataCols,
|
||||||
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
pDataCols0->cols[0].pData, pDataCols0->numOfRows);
|
||||||
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
|
ASSERT(rowsRead == rows && rowsRead == pDataCols->numOfRows);
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, false) < 0)
|
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, false) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
if (tsdbAddSubBlock(pHelper, &compBlock, tblkIdx, rowsRead) < 0) return -1;
|
||||||
tblkIdx++;
|
tblkIdx++;
|
||||||
|
@ -1506,7 +1523,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
||||||
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
|
tsdbLoadAndMergeFromCache(pDataCols0, &dIter, pCommitIter, pDataCols, keyLimit, defaultRowsInBlock);
|
||||||
if (rowsRead == 0) break;
|
if (rowsRead == 0) break;
|
||||||
|
|
||||||
if (tsdbWriteBlockToFile(pHelper, &(pHelper->files.dataF), pDataCols, &compBlock, false, true) < 0)
|
if (tsdbWriteBlockToFile(pHelper, helperDataF(pHelper), pDataCols, &compBlock, false, true) < 0)
|
||||||
return -1;
|
return -1;
|
||||||
if (round == 0) {
|
if (round == 0) {
|
||||||
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
if (tsdbUpdateSuperBlock(pHelper, &compBlock, tblkIdx) < 0) return -1;
|
||||||
|
@ -1577,10 +1594,10 @@ static int tsdbWriteBlockToProperFile(SRWHelper *pHelper, SDataCols *pDataCols,
|
||||||
ASSERT(pDataCols->numOfRows > 0);
|
ASSERT(pDataCols->numOfRows > 0);
|
||||||
|
|
||||||
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
|
if (pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) {
|
||||||
pFile = &(pHelper->files.dataF);
|
pFile = helperDataF(pHelper);
|
||||||
} else {
|
} else {
|
||||||
isLast = true;
|
isLast = true;
|
||||||
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? &(pHelper->files.nLastF) : &(pHelper->files.lastF);
|
pFile = TSDB_NLAST_FILE_OPENED(pHelper) ? helperNewLastF(pHelper) : helperLastF(pHelper);
|
||||||
}
|
}
|
||||||
|
|
||||||
ASSERT(pFile->fd > 0);
|
ASSERT(pFile->fd > 0);
|
||||||
|
|
Loading…
Reference in New Issue