Merge branch 'enh/tsdb_optimize' of github.com:taosdata/tdengine into enh/tsdb_optimize
This commit is contained in:
commit
f49f96049e
|
@ -370,12 +370,12 @@ struct STsdb {
|
||||||
TdThreadRwlock rwLock;
|
TdThreadRwlock rwLock;
|
||||||
SMemTable *mem;
|
SMemTable *mem;
|
||||||
SMemTable *imem;
|
SMemTable *imem;
|
||||||
STsdbFS fs;
|
STsdbFS fs; // old
|
||||||
SLRUCache *lruCache;
|
SLRUCache *lruCache;
|
||||||
TdThreadMutex lruMutex;
|
TdThreadMutex lruMutex;
|
||||||
SLRUCache *biCache;
|
SLRUCache *biCache;
|
||||||
TdThreadMutex biMutex;
|
TdThreadMutex biMutex;
|
||||||
struct STFileSystem *pFS;
|
struct STFileSystem *pFS; // new
|
||||||
SRocksCache rCache;
|
SRocksCache rCache;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -122,6 +122,14 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
committer->ctx->tbid->uid = row->uid;
|
committer->ctx->tbid->uid = row->uid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t ts = TSDBROW_TS(&row->row);
|
||||||
|
if (ts > committer->ctx->maxKey) {
|
||||||
|
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
|
||||||
|
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
code = tsdbFSetWriteRow(committer->writer, row);
|
code = tsdbFSetWriteRow(committer->writer, row);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ extern "C" {
|
||||||
|
|
||||||
typedef TARRAY2(SSttBlk) TSttBlkArray;
|
typedef TARRAY2(SSttBlk) TSttBlkArray;
|
||||||
typedef TARRAY2(SStatisBlk) TStatisBlkArray;
|
typedef TARRAY2(SStatisBlk) TStatisBlkArray;
|
||||||
typedef TARRAY2(STombBlk) TTombBlkArray;
|
|
||||||
|
|
||||||
// SSttFileReader ==========================================
|
// SSttFileReader ==========================================
|
||||||
typedef struct SSttFileReader SSttFileReader;
|
typedef struct SSttFileReader SSttFileReader;
|
||||||
|
|
|
@ -123,13 +123,130 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *aDelIdx, STFileSet *fset) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
SArray *aDelData = NULL;
|
||||||
|
int64_t minKey, maxKey;
|
||||||
|
STombBlock tombBlock[1] = {0};
|
||||||
|
TTombBlkArray tombBlkArray[1] = {0};
|
||||||
|
STsdbFD *fd = NULL;
|
||||||
|
|
||||||
|
tsdbFidKeyRange(fset->fid, tsdb->keepCfg.days, tsdb->keepCfg.precision, &minKey, &maxKey);
|
||||||
|
|
||||||
|
if ((aDelData = taosArrayInit(0, sizeof(SDelData))) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(aDelIdx); ++i) {
|
||||||
|
SDelIdx *pDelIdx = taosArrayGet(aDelIdx, i);
|
||||||
|
|
||||||
|
code = tsdbReadDelData(reader, pDelIdx, aDelData);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(aDelData); ++j) {
|
||||||
|
SDelData *pDelData = taosArrayGet(aDelData, j);
|
||||||
|
|
||||||
|
if (pDelData->sKey > maxKey || pDelData->eKey < minKey) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STombRecord record = {
|
||||||
|
.suid = pDelIdx->suid,
|
||||||
|
.uid = pDelIdx->uid,
|
||||||
|
.version = pDelData->version,
|
||||||
|
.skey = pDelData->sKey,
|
||||||
|
.ekey = pDelData->eKey,
|
||||||
|
};
|
||||||
|
|
||||||
|
code = tTombBlockPut(tombBlock, &record);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (TOMB_BLOCK_SIZE(tombBlock) >= tsdb->pVnode->config.tsdbCfg.maxRows) {
|
||||||
|
if (fd == NULL) {
|
||||||
|
STFile file = {
|
||||||
|
.type = TSDB_FTYPE_TOMB,
|
||||||
|
.did = {0}, // TODO
|
||||||
|
.fid = fset->fid,
|
||||||
|
.cid = 0, // TODO
|
||||||
|
};
|
||||||
|
|
||||||
|
code = tsdbTFileObjInit(tsdb, &file, &fset->farr[TSDB_FTYPE_TOMB]);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbOpenFile(fset->farr[TSDB_FTYPE_TOMB]->fname, tsdb->pVnode->config.tsdbPageSize,
|
||||||
|
TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC, &fd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||||
|
code = tsdbWriteFile(fd, 0, hdr, TSDB_FHDR_SIZE);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
fset->farr[TSDB_FTYPE_TOMB]->f->size += sizeof(hdr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
tTombBlockClear(tombBlock);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TOMB_BLOCK_SIZE(tombBlock) > 0) {
|
||||||
|
// TODO
|
||||||
|
tTombBlockClear(tombBlock);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TARRAY2_SIZE(tombBlkArray) > 0) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fd) {
|
||||||
|
code = tsdbFsyncFile(fd);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
tsdbCloseFile(&fd);
|
||||||
|
}
|
||||||
|
TARRAY2_DESTROY(tombBlkArray, NULL);
|
||||||
|
tTombBlockDestroy(tombBlock);
|
||||||
|
taosArrayDestroy(aDelData);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) {
|
static int32_t tsdbUpgradeTombFile(STsdb *tsdb, SDelFile *pDelFile, TFileSetArray *fileSetArray) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
// TODO
|
SDelFReader *reader = NULL;
|
||||||
|
SArray *aDelIdx = NULL;
|
||||||
|
|
||||||
|
if ((aDelIdx = taosArrayInit(0, sizeof(SDelIdx))) == NULL) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbDelFReaderOpen(&reader, pDelFile, tsdb);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
code = tsdbReadDelIdx(reader, aDelIdx);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
if (taosArrayGetSize(aDelIdx) > 0) {
|
||||||
|
STFileSet *fset;
|
||||||
|
TARRAY2_FOREACH(fileSetArray, fset) {
|
||||||
|
code = tsdbDumpTombDataToFSet(tsdb, reader, aDelIdx, fset);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbDelFReaderClose(&reader);
|
||||||
|
taosArrayDestroy(aDelIdx);
|
||||||
|
|
||||||
ASSERT(0);
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
@ -166,9 +283,11 @@ static int32_t tsdbDoUpgradeFileSystem(STsdb *tsdb, int8_t rollback) {
|
||||||
char fname[TSDB_FILENAME_LEN];
|
char fname[TSDB_FILENAME_LEN];
|
||||||
current_fname(tsdb, fname, TSDB_FCURRENT);
|
current_fname(tsdb, fname, TSDB_FCURRENT);
|
||||||
|
|
||||||
code = save_fs(fileSetArray, NULL);
|
code = save_fs(fileSetArray, fname);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
TARRAY2_DESTROY(fileSetArray, tsdbTFileSetClear);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
|
|
@ -58,6 +58,8 @@ typedef struct {
|
||||||
int8_t rsvd[7];
|
int8_t rsvd[7];
|
||||||
} STombBlk;
|
} STombBlk;
|
||||||
|
|
||||||
|
typedef TARRAY2(STombBlk) TTombBlkArray;
|
||||||
|
|
||||||
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
|
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
|
||||||
|
|
||||||
int32_t tTombBlockInit(STombBlock *tombBlock);
|
int32_t tTombBlockInit(STombBlock *tombBlock);
|
||||||
|
|
Loading…
Reference in New Issue