more code
This commit is contained in:
parent
9490bf2a5e
commit
71c967d603
|
@ -54,6 +54,10 @@ target_sources(
|
||||||
"src/tsdb/tsdbDiskData.c"
|
"src/tsdb/tsdbDiskData.c"
|
||||||
"src/tsdb/tsdbMergeTree.c"
|
"src/tsdb/tsdbMergeTree.c"
|
||||||
"src/tsdb/tsdbDataIter.c"
|
"src/tsdb/tsdbDataIter.c"
|
||||||
|
# # dev
|
||||||
|
"src/tsdb/dev/tsdbCommit2.c"
|
||||||
|
"src/tsdb/dev/tsdbMerge.c"
|
||||||
|
"src/tsdb/dev/tsdbReaderWriter2.c"
|
||||||
|
|
||||||
# tq
|
# tq
|
||||||
"src/tq/tq.c"
|
"src/tq/tq.c"
|
||||||
|
|
|
@ -15,6 +15,13 @@
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
// extern dependencies
|
||||||
|
typedef struct SSttFWriter SSttFWriter;
|
||||||
|
|
||||||
|
extern int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter);
|
||||||
|
extern int32_t tsdbSttFWriterClose(SSttFWriter *pWritter);
|
||||||
|
extern int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
// config
|
// config
|
||||||
|
@ -26,65 +33,81 @@ typedef struct {
|
||||||
int8_t sttTrigger;
|
int8_t sttTrigger;
|
||||||
SArray *aTbDataP;
|
SArray *aTbDataP;
|
||||||
// context
|
// context
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
int32_t expLevel;
|
int32_t expLevel;
|
||||||
TSKEY minKey;
|
TSKEY minKey;
|
||||||
TSKEY maxKey;
|
TSKEY maxKey;
|
||||||
int64_t cid; // commit id
|
// writer
|
||||||
SSkmInfo skmTable;
|
SSttFWriter *pWriter;
|
||||||
SSkmInfo skmRow;
|
|
||||||
SBlockData bData;
|
|
||||||
SColData aColData[4]; // <suid, uid, ts, version>
|
|
||||||
SArray *aSttBlk; // SArray<SSttBlk>
|
|
||||||
SArray *aDelBlk; // SArray<SDelBlk>
|
|
||||||
} SCommitter;
|
} SCommitter;
|
||||||
|
|
||||||
static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) {
|
static int32_t tsdbCommitOpenWriter(SCommitter *pCommitter) {
|
||||||
|
int32_t code = 0;
|
||||||
// TODO
|
// TODO
|
||||||
ASSERT(0);
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitWriteTSData(SCommitter *pCommitter, int64_t suid, int64_t uid, TSDBROW *pRow) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino;
|
||||||
|
|
||||||
|
if (pCommitter->pWriter == NULL) {
|
||||||
|
code = tsdbCommitOpenWriter(pCommitter);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
|
|
||||||
|
code = tsdbSttFWriteRow(pCommitter->pWriter, suid, uid, pRow);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (code) {
|
||||||
|
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
tsdbTrace("vgId:%d %s done, fid:%d suid:%" PRId64 " uid:%" PRId64 " ts:%" PRId64 " version:%" PRId64,
|
||||||
|
TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid, suid, uid, TSDBROW_KEY(pRow).ts,
|
||||||
|
TSDBROW_KEY(pRow).version);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbCommitWriteDelData(SCommitter *pCommitter, int64_t suid, int64_t uid, int64_t version, int64_t sKey,
|
||||||
|
int64_t eKey) {
|
||||||
|
int32_t code = 0;
|
||||||
|
// TODO
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) {
|
static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
|
int64_t nRow = 0;
|
||||||
SMemTable *pMem = pCommitter->pTsdb->imem;
|
SMemTable *pMem = pCommitter->pTsdb->imem;
|
||||||
|
|
||||||
if (pMem->nRow == 0) goto _exit;
|
if (pMem->nRow == 0) { // no time-series data to commit
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
|
||||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
||||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
|
||||||
|
|
||||||
// TODO: prepare commit next table
|
|
||||||
|
|
||||||
STbDataIter iter;
|
STbDataIter iter;
|
||||||
TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
|
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||||
|
|
||||||
tsdbTbDataIterOpen(pTbData, &from, 0, &iter);
|
tsdbTbDataIterOpen(pTbData, &from, 0, &iter);
|
||||||
|
|
||||||
for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
|
for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
|
||||||
TSDBKEY rowKey = TSDBROW_KEY(pRow);
|
TSDBKEY rowKey = TSDBROW_KEY(pRow);
|
||||||
|
|
||||||
if (rowKey.ts > pCommitter->maxKey) {
|
if (rowKey.ts > pCommitter->maxKey) {
|
||||||
pCommitter->nextKey = TMIN(rowKey.ts, pCommitter->nextKey);
|
pCommitter->nextKey = TMIN(pCommitter->nextKey, rowKey.ts);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
nRow++;
|
||||||
// code = tsdbUpdateSkmInfo(&pCommitter->skmRow, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
|
||||||
|
|
||||||
code = tBlockDataAppendRow(&pCommitter->bData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
|
code = tsdbCommitWriteTSData(pCommitter, pTbData->suid, pTbData->uid, pRow);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
if (pCommitter->bData.nRow >= pCommitter->maxRow) {
|
|
||||||
// code = tsdbWriteSttBlock(pCommitter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
tBlockDataClear(&pCommitter->bData);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -93,35 +116,35 @@ _exit:
|
||||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
|
tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
|
||||||
pMem->nRow);
|
nRow);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitTombstoneData(SCommitter *pCommitter) {
|
static int32_t tsdbCommitDelData(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino;
|
int32_t lino;
|
||||||
|
|
||||||
|
int64_t nDel = 0;
|
||||||
SMemTable *pMem = pCommitter->pTsdb->imem;
|
SMemTable *pMem = pCommitter->pTsdb->imem;
|
||||||
|
|
||||||
if (pMem->nDel == 0) goto _exit;
|
if (pMem->nDel == 0) { // no del data
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
||||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||||
|
|
||||||
if (pTbData->pHead == NULL) continue;
|
|
||||||
|
|
||||||
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
||||||
if (pDelData->sKey > pCommitter->maxKey || pDelData->eKey < pCommitter->minKey) continue;
|
if (pDelData->eKey < pCommitter->minKey) continue;
|
||||||
|
if (pDelData->sKey > pCommitter->maxKey) {
|
||||||
// code = tsdbAppendDelData(pCommitter, pTbData->suid, pTbData->uid, TMAX(pDelData->sKey, pCommitter->minKey),
|
pCommitter->nextKey = TMIN(pCommitter->nextKey, pDelData->sKey);
|
||||||
// TMIN(pDelData->eKey, pCommitter->maxKey), pDelData->version);
|
continue;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
|
|
||||||
if (/* TODO */ 0 > pCommitter->maxRow) {
|
|
||||||
// code = tsdbWriteDelBlock(pCommitter);
|
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbCommitWriteDelData(pCommitter, pTbData->suid, pTbData->uid, pDelData->version,
|
||||||
|
pDelData->sKey /* TODO */, pDelData->eKey /* TODO */);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,19 +158,6 @@ _exit:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitDelData(SCommitter *pCommitter) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t lino = 0;
|
|
||||||
|
|
||||||
// TODO
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (code) {
|
|
||||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -156,11 +166,6 @@ static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) {
|
||||||
tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
|
tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
|
||||||
&pCommitter->maxKey);
|
&pCommitter->maxKey);
|
||||||
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
|
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
|
||||||
#if 0
|
|
||||||
// pCommitter->cid = tsdbFileSetNextCid(STsdb * pTsdb, pCommitter->fid);
|
|
||||||
#else
|
|
||||||
pCommitter->cid = 0;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
|
@ -188,8 +193,6 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
|
||||||
|
|
||||||
// fset commit start
|
// fset commit start
|
||||||
code = tsdbCommitFSetStart(pCommitter);
|
code = tsdbCommitFSetStart(pCommitter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
@ -198,7 +201,7 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) {
|
||||||
code = tsdbCommitTimeSeriesData(pCommitter);
|
code = tsdbCommitTimeSeriesData(pCommitter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
code = tsdbCommitTombstoneData(pCommitter);
|
code = tsdbCommitDelData(pCommitter);
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
|
||||||
// fset commit end
|
// fset commit end
|
||||||
|
@ -207,7 +210,8 @@ static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) {
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), __func__, lino,
|
||||||
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,9 +6,11 @@ typedef struct SSttFReader SSttFReader;
|
||||||
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
|
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
|
||||||
extern void tsdbCloseFile(STsdbFD **ppFD);
|
extern void tsdbCloseFile(STsdbFD **ppFD);
|
||||||
struct SSttFWriter {
|
struct SSttFWriter {
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
STsdbFD *pFd;
|
STsdbFD *pFd;
|
||||||
SSttFile file;
|
SSttFile file;
|
||||||
|
SBlockData bData;
|
||||||
|
SArray *aSttBlk;
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) {
|
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) {
|
||||||
|
@ -42,22 +44,10 @@ int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteSttBlockData(SSttFWriter *pWritter, SBlockData *pBlockData, SSttBlk *pSttBlk) {
|
int32_t tsdbSttFWriteRow(SSttFWriter *pWritter, int64_t suid, int64_t uid, TSDBROW *pRow) {
|
||||||
// TODO
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
// TODO write row
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbWriteSttBlockIdx(SSttFWriter *pWriter, SArray *aSttBlk) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbWriteSttDelData(SSttFWriter *pWriter) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tsdbWriteSttDelIdx(SSttFWriter *pWriter) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue