more work
This commit is contained in:
parent
3efb2c7c53
commit
709e9fd78b
|
@ -37,6 +37,14 @@ typedef struct STSRowBuilder STSRowBuilder;
|
|||
typedef struct STagVal STagVal;
|
||||
typedef struct STag STag;
|
||||
|
||||
// bitmap
|
||||
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
|
||||
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
|
||||
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
|
||||
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
|
||||
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
|
||||
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
|
||||
|
||||
// STSchema
|
||||
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
|
||||
void tTSchemaDestroy(STSchema *pTSchema);
|
||||
|
|
|
@ -29,12 +29,6 @@ typedef struct {
|
|||
#pragma pack(pop)
|
||||
|
||||
#define TSROW_IS_KV_ROW(r) ((r)->flags & TSROW_KV_ROW)
|
||||
#define BIT1_SIZE(n) (((n)-1) / 8 + 1)
|
||||
#define BIT2_SIZE(n) (((n)-1) / 4 + 1)
|
||||
#define SET_BIT1(p, i, v) ((p)[(i) / 8] = (p)[(i) / 8] & (~(((uint8_t)1) << ((i) % 8))) | ((v) << ((i) % 8)))
|
||||
#define SET_BIT2(p, i, v) ((p)[(i) / 4] = (p)[(i) / 4] & (~(((uint8_t)3) << ((i) % 4))) | ((v) << ((i) % 4)))
|
||||
#define GET_BIT1(p, i) (((p)[(i) / 8] >> ((i) % 8)) & ((uint8_t)1))
|
||||
#define GET_BIT2(p, i) (((p)[(i) / 4] >> ((i) % 4)) & ((uint8_t)3))
|
||||
|
||||
// SValue
|
||||
int32_t tPutValue(uint8_t *p, SValue *pValue, int8_t type) {
|
||||
|
|
|
@ -79,6 +79,10 @@ typedef struct STsdbSmaFile STsdbSmaFile;
|
|||
typedef struct STsdbSmalFile STsdbSmalFile;
|
||||
typedef struct SDFileSet SDFileSet;
|
||||
|
||||
// SDelFile
|
||||
#define tsdbDelFileCreate() ((SDelFile){.info = KEYINFO_INIT_VAL, .size = 0, .offset = 0})
|
||||
char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile);
|
||||
|
||||
// tsdbFS.c ==============================================================================================
|
||||
typedef struct STsdbFS STsdbFS;
|
||||
|
||||
|
@ -131,15 +135,21 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
|
|||
|
||||
// tsdbUtil.c ==============================================================================================
|
||||
// TSDBROW
|
||||
#define tsdbRowFromTSRow(VERSION, TSROW) ((TSDBROW){.type = 0, .version = (VERSION), .pTSRow = (TSROW)});
|
||||
#define tsdbRowFromBlockData(BLOCKDATA, IROW) ((TSDBROW){.type = 1, .pBlockData = (BLOCKDATA), .pTSRow = (IROW)});
|
||||
TSDBKEY tsdbRowKey(TSDBROW *pRow);
|
||||
|
||||
int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
|
||||
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
|
||||
|
||||
// memory
|
||||
int32_t tsdbRealloc(uint8_t **ppBuf, int64_t size);
|
||||
void tsdbFree(uint8_t *pBuf);
|
||||
|
||||
// TABLEID
|
||||
int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
|
||||
|
||||
// TSDBKEY
|
||||
int32_t tsdbKeyCmprFn(const void *p1, const void *p2);
|
||||
|
||||
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
|
||||
|
@ -275,6 +285,7 @@ int tsdbLockRepo(STsdb *pTsdb);
|
|||
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||
|
||||
struct TSDBROW {
|
||||
int8_t type; // 0 for row from tsRow, 1 for row from block data
|
||||
union {
|
||||
struct {
|
||||
int64_t version;
|
||||
|
@ -332,7 +343,8 @@ struct SColData {
|
|||
struct SBlockData {
|
||||
int32_t maxRow;
|
||||
int32_t nRow;
|
||||
TSDBKEY *aKey;
|
||||
int64_t *aVersion;
|
||||
TSKEY *aTSKEY;
|
||||
int32_t maxCol;
|
||||
int32_t nCol;
|
||||
SColData *aColData;
|
||||
|
|
|
@ -47,570 +47,13 @@ struct SDFileSet {
|
|||
STsdbSmaFile *pSmaF;
|
||||
};
|
||||
|
||||
#if 0
|
||||
static const char *TSDB_FNAME_SUFFIX[] = {
|
||||
"head", // TSDB_FILE_HEAD
|
||||
"data", // TSDB_FILE_DATA
|
||||
"last", // TSDB_FILE_LAST
|
||||
"smad", // TSDB_FILE_SMAD
|
||||
"smal", // TSDB_FILE_SMAL
|
||||
"", // TSDB_FILE_MAX
|
||||
"meta", // TSDB_FILE_META
|
||||
};
|
||||
// SDelFile ===============================================
|
||||
char *tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile) {
|
||||
char *pName = NULL;
|
||||
int32_t size;
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname);
|
||||
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo);
|
||||
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo);
|
||||
static int tsdbRollBackDFile(SDFile *pDFile);
|
||||
// TODO
|
||||
// sprintf(pName, "", pTsdb->path, );
|
||||
|
||||
// ============== Operations on SDFile
|
||||
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype) {
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_OK);
|
||||
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
|
||||
memset(&(pDFile->info), 0, sizeof(pDFile->info));
|
||||
pDFile->info.magic = TSDB_FILE_INIT_MAGIC;
|
||||
pDFile->info.fver = tsdbGetDFSVersion(ftype);
|
||||
|
||||
tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, pRepo->dir, fname);
|
||||
tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname);
|
||||
}
|
||||
|
||||
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile) {
|
||||
*pDFile = *pODFile;
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
}
|
||||
|
||||
int tsdbEncodeSDFile(void **buf, SDFile *pDFile) {
|
||||
int tlen = 0;
|
||||
|
||||
tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
|
||||
tlen += tfsEncodeFile(buf, &(pDFile->f));
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile) {
|
||||
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
||||
buf = tfsDecodeFile(REPO_TFS(pRepo), buf, &(pDFile->f));
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int tsdbEncodeSDFileEx(void **buf, SDFile *pDFile) {
|
||||
int tlen = 0;
|
||||
|
||||
tlen += tsdbEncodeDFInfo(buf, &(pDFile->info));
|
||||
tlen += taosEncodeString(buf, TSDB_FILE_FULL_NAME(pDFile));
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static void *tsdbDecodeSDFileEx(void *buf, SDFile *pDFile) {
|
||||
char *aname = NULL;
|
||||
|
||||
buf = tsdbDecodeDFInfo(buf, &(pDFile->info));
|
||||
buf = taosDecodeString(buf, &aname);
|
||||
strncpy(TSDB_FILE_FULL_NAME(pDFile), aname, TSDB_FILENAME_LEN);
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
taosMemoryFreeClear(aname);
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType) {
|
||||
ASSERT(pDFile->info.size == 0 && pDFile->info.magic == TSDB_FILE_INIT_MAGIC);
|
||||
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pDFile->pFile == NULL) {
|
||||
if (errno == ENOENT) {
|
||||
// Try to create directory recursively
|
||||
char *s = strdup(TSDB_FILE_REL_NAME(pDFile));
|
||||
if (tfsMkdirRecurAt(REPO_TFS(pRepo), taosDirName(s), TSDB_FILE_DID(pDFile)) < 0) {
|
||||
taosMemoryFreeClear(s);
|
||||
return -1;
|
||||
}
|
||||
taosMemoryFreeClear(s);
|
||||
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pDFile->pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (!updateHeader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
|
||||
pDFile->info.fver = tsdbGetDFSVersion(fType);
|
||||
|
||||
if (tsdbUpdateDFileHeader(pDFile) < 0) {
|
||||
tsdbCloseDFile(pDFile);
|
||||
tsdbRemoveDFile(pDFile);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbUpdateDFileHeader(SDFile *pDFile) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
|
||||
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *ptr = buf;
|
||||
// taosEncodeFixedU32(&ptr, 0); // fver moved to SDFInfo and saved to current
|
||||
tsdbEncodeDFInfo(&ptr, &(pDFile->info));
|
||||
|
||||
taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE);
|
||||
if (tsdbWriteDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) {
|
||||
char buf[TSDB_FILE_HEAD_SIZE] = "\0";
|
||||
uint32_t _version;
|
||||
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
if (tsdbSeekDFile(pDFile, 0, SEEK_SET) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbReadDFile(pDFile, buf, TSDB_FILE_HEAD_SIZE) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) {
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *pBuf = buf;
|
||||
// pBuf = taosDecodeFixedU32(pBuf, &_version);
|
||||
pBuf = tsdbDecodeDFInfo(pBuf, pInfo);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) {
|
||||
SDFile df;
|
||||
|
||||
tsdbInitDFileEx(&df, pDFile);
|
||||
|
||||
if (!taosCheckExistFile(TSDB_FILE_FULL_NAME(pDFile))) {
|
||||
tsdbError("vgId:%d, data file %s not exit, report to upper layer to fix it", REPO_ID(pRepo),
|
||||
TSDB_FILE_FULL_NAME(pDFile));
|
||||
// pRepo->state |= TSDB_STATE_BAD_DATA;
|
||||
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
|
||||
return 0;
|
||||
}
|
||||
int64_t file_size = 0;
|
||||
if (taosStatFile(TSDB_FILE_FULL_NAME(&df), &file_size, NULL) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pDFile->info.size < file_size) {
|
||||
// if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
|
||||
if (tsdbOpenDFile(&df, TD_FILE_WRITE) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosFtruncateFile(df.pFile, df.info.size) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tsdbCloseDFile(&df);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateDFileHeader(&df) < 0) {
|
||||
tsdbCloseDFile(&df);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbCloseDFile(&df);
|
||||
tsdbInfo("vgId:%d, file %s is truncated from %" PRId64 " to %" PRId64, REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile),
|
||||
file_size, pDFile->info.size);
|
||||
} else if (pDFile->info.size > file_size) {
|
||||
tsdbError("vgId:%d, data file %s has wrong size %" PRId64 " expected %" PRId64 ", report to upper layer to fix it",
|
||||
REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile), file_size, pDFile->info.size);
|
||||
// pRepo->state |= TSDB_STATE_BAD_DATA;
|
||||
TSDB_FILE_SET_STATE(pDFile, TSDB_FILE_STATE_BAD);
|
||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||
return 0;
|
||||
} else {
|
||||
tsdbDebug("vgId:%d, file %s passes the scan", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pDFile));
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) {
|
||||
int tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->magic);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->fver);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->len);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->totalBlocks);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->totalSubBlocks);
|
||||
tlen += taosEncodeFixedU32(buf, pInfo->offset);
|
||||
tlen += taosEncodeFixedU64(buf, pInfo->size);
|
||||
tlen += taosEncodeFixedU64(buf, pInfo->tombSize);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->magic));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->fver));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->len));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks));
|
||||
buf = taosDecodeFixedU32(buf, &(pInfo->offset));
|
||||
buf = taosDecodeFixedU64(buf, &(pInfo->size));
|
||||
buf = taosDecodeFixedU64(buf, &(pInfo->tombSize));
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
||||
static int tsdbApplyDFileChange(SDFile *from, SDFile *to) {
|
||||
ASSERT(from != NULL || to != NULL);
|
||||
|
||||
if (from != NULL) {
|
||||
if (to == NULL) {
|
||||
tsdbRemoveDFile(from);
|
||||
} else {
|
||||
if (tfsIsSameFile(TSDB_FILE_F(from), TSDB_FILE_F(to))) {
|
||||
if (from->info.size > to->info.size) {
|
||||
tsdbRollBackDFile(to);
|
||||
}
|
||||
} else {
|
||||
(void)tsdbRemoveDFile(from);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbRollBackDFile(SDFile *pDFile) {
|
||||
SDFile df = *pDFile;
|
||||
|
||||
// if (tsdbOpenDFile(&df, O_WRONLY) < 0) {
|
||||
if (tsdbOpenDFile(&df, TD_FILE_WRITE) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosFtruncateFile(TSDB_FILE_PFILE(&df), pDFile->info.size) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tsdbCloseDFile(&df);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateDFileHeader(&df) < 0) {
|
||||
tsdbCloseDFile(&df);
|
||||
return -1;
|
||||
}
|
||||
|
||||
TSDB_FILE_FSYNC(&df);
|
||||
|
||||
tsdbCloseDFile(&df);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ============== Operations on SDFileSet
|
||||
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver) {
|
||||
TSDB_FSET_FID(pSet) = fid;
|
||||
TSDB_FSET_VER(pSet) = TSDB_LATEST_FSET_VER;
|
||||
TSDB_FSET_STATE(pSet) = 0;
|
||||
pSet->reserve = 0;
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
SDFile *pDFile = TSDB_DFILE_IN_SET(pSet, ftype);
|
||||
tsdbInitDFile(pRepo, pDFile, did, fid, ver, ftype);
|
||||
}
|
||||
}
|
||||
|
||||
void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet) {
|
||||
TSDB_FSET_FID(pSet) = TSDB_FSET_FID(pOSet);
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
tsdbInitDFileEx(TSDB_DFILE_IN_SET(pSet, ftype), TSDB_DFILE_IN_SET(pOSet, ftype));
|
||||
}
|
||||
}
|
||||
|
||||
int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet) {
|
||||
int tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
|
||||
// state not included
|
||||
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
|
||||
tlen += taosEncodeFixedU16(buf, pSet->reserve);
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
tlen += tsdbEncodeSDFile(buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet) {
|
||||
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
|
||||
TSDB_FSET_STATE(pSet) = 0;
|
||||
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
|
||||
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
buf = tsdbDecodeSDFile(pRepo, buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet) {
|
||||
int tlen = 0;
|
||||
|
||||
tlen += taosEncodeFixedI32(buf, TSDB_FSET_FID(pSet));
|
||||
tlen += taosEncodeFixedU8(buf, TSDB_FSET_VER(pSet));
|
||||
tlen += taosEncodeFixedU16(buf, pSet->reserve);
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
tlen += tsdbEncodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet) {
|
||||
buf = taosDecodeFixedI32(buf, &(TSDB_FSET_FID(pSet)));
|
||||
buf = taosDecodeFixedU8(buf, &(TSDB_FSET_VER(pSet)));
|
||||
buf = taosDecodeFixedU16(buf, &(pSet->reserve));
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
buf = tsdbDecodeSDFileEx(buf, TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
SDFile *pDFileFrom = (from) ? TSDB_DFILE_IN_SET(from, ftype) : NULL;
|
||||
SDFile *pDFileTo = (to) ? TSDB_DFILE_IN_SET(to, ftype) : NULL;
|
||||
if (tsdbApplyDFileChange(pDFileFrom, pDFileTo) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbCreateDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype), updateHeader, ftype) < 0) {
|
||||
tsdbCloseDFileSet(pSet);
|
||||
tsdbRemoveDFileSet(pSet);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbUpdateDFileSetHeader(SDFileSet *pSet) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbUpdateDFileHeader(TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *_version) {
|
||||
char *p = NULL;
|
||||
*_version = 0;
|
||||
*ftype = TSDB_FILE_MAX;
|
||||
|
||||
sscanf(fname, "v%df%d.%m[a-z]-ver%" PRIu32, vid, fid, &p, _version);
|
||||
for (TSDB_FILE_T i = 0; i < TSDB_FILE_MAX; i++) {
|
||||
if (strcmp(p, TSDB_FNAME_SUFFIX[i]) == 0) {
|
||||
*ftype = i;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(p);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, const char *dname, char *fname) {
|
||||
ASSERT(ftype != TSDB_FILE_MAX);
|
||||
|
||||
if (ftype < TSDB_FILE_MAX) {
|
||||
if (ver == 0) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s", vid, dname, vid, fid,
|
||||
TSDB_FNAME_SUFFIX[ftype]);
|
||||
} else {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/%s/data/v%df%d.%s-ver%" PRIu32, vid, dname, vid, fid,
|
||||
TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
}
|
||||
} else {
|
||||
if (ver == 0) {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
|
||||
} else {
|
||||
snprintf(fname, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/%s-ver%" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int tsdbOpenDFile(SDFile *pDFile, int flags) {
|
||||
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
|
||||
if (pDFile->pFile == NULL) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbCloseDFile(SDFile *pDFile) {
|
||||
if (TSDB_FILE_OPENED(pDFile)) {
|
||||
taosCloseFile(&pDFile->pFile);
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
}
|
||||
}
|
||||
|
||||
int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
|
||||
// ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
|
||||
if (loffset < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return loffset;
|
||||
}
|
||||
|
||||
int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
|
||||
if (nwrite < nbyte) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return nwrite;
|
||||
}
|
||||
|
||||
void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
|
||||
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t toffset;
|
||||
|
||||
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pDFile->info.size == toffset);
|
||||
|
||||
if (offset) {
|
||||
*offset = toffset;
|
||||
}
|
||||
|
||||
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pDFile->info.size += nbyte;
|
||||
|
||||
return (int)nbyte;
|
||||
}
|
||||
|
||||
int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
||||
|
||||
int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
|
||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||
|
||||
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
|
||||
if (nread < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return nread;
|
||||
}
|
||||
|
||||
int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
|
||||
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pDest->info = pSrc->info;
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbCloseDFileSet(SDFileSet *pSet) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
}
|
||||
|
||||
int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
|
||||
tsdbCloseDFileSet(pSet);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbRemoveDFileSet(SDFileSet *pSet) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
||||
}
|
||||
}
|
||||
|
||||
int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
||||
tsdbRemoveDFileSet(pDest);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
|
||||
*minKey = fid * days * tsTickPerMin[precision];
|
||||
*maxKey = *minKey + days * tsTickPerMin[precision] - 1;
|
||||
}
|
||||
#endif
|
||||
return pName;
|
||||
}
|
|
@ -544,6 +544,18 @@ int32_t tGetKEYINFO(uint8_t *p, KEYINFO *pKeyInfo) {
|
|||
}
|
||||
|
||||
// SBlockData ======================================================
|
||||
static int32_t tsdbBlockDataAppendRow0(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbBlockDataAppendRow1(SBlockData *pBlockData, TSDBROW *pRow) {
|
||||
int32_t code = 0;
|
||||
// TODO
|
||||
return code;
|
||||
}
|
||||
|
||||
void tsdbBlockDataClear(SBlockData *pBlockData) {
|
||||
pBlockData->nRow = 0;
|
||||
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) {
|
||||
|
@ -571,10 +583,10 @@ int32_t tsdbBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *
|
|||
}
|
||||
ASSERT(pBlockData->maxRow > pBlockData->nRow);
|
||||
|
||||
code = tsdbRealloc((uint8_t **)&pBlockData->aKey, sizeof(TSDBKEY) * pBlockData->maxRow);
|
||||
// code = tsdbRealloc((uint8_t **)&pBlockData->aKey, sizeof(TSDBKEY) * pBlockData->maxRow);
|
||||
if (code) goto _err;
|
||||
}
|
||||
pBlockData->aKey[nRow] = key;
|
||||
// pBlockData->aKey[nRow] = key;
|
||||
|
||||
// other cols
|
||||
int16_t iColData = 0;
|
||||
|
@ -615,7 +627,8 @@ _err:
|
|||
}
|
||||
|
||||
void tsdbBlockDataDestroy(SBlockData *pBlockData) {
|
||||
tsdbFree((uint8_t *)pBlockData->aKey);
|
||||
tsdbFree((uint8_t *)pBlockData->aVersion);
|
||||
tsdbFree((uint8_t *)pBlockData->aTSKEY);
|
||||
for (int32_t iCol = 0; iCol < pBlockData->nCol; iCol++) {
|
||||
tsdbFree(pBlockData->aColData[iCol].pBitMap);
|
||||
tsdbFree(pBlockData->aColData[iCol].pData);
|
||||
|
|
Loading…
Reference in New Issue