more code
This commit is contained in:
parent
4f0833e1a0
commit
9490bf2a5e
|
@ -39,8 +39,6 @@ target_sources(
|
|||
|
||||
# tsdb
|
||||
"src/tsdb/tsdbCommit.c"
|
||||
"src/tsdb/tsdbCommit2.c"
|
||||
"src/tsdb/tsdbMerge.c"
|
||||
"src/tsdb/tsdbFile.c"
|
||||
"src/tsdb/tsdbFS.c"
|
||||
"src/tsdb/tsdbOpen.c"
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
typedef struct {
|
||||
STsdb *pTsdb;
|
||||
// config
|
||||
int64_t commitID;
|
||||
int32_t minutes;
|
||||
int8_t precision;
|
||||
int32_t minRow;
|
||||
|
@ -27,11 +26,12 @@ typedef struct {
|
|||
int8_t sttTrigger;
|
||||
SArray *aTbDataP;
|
||||
// context
|
||||
TSKEY nextKey; // reset by each table commit
|
||||
TSKEY nextKey;
|
||||
int32_t fid;
|
||||
int32_t expLevel;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
int64_t cid; // commit id
|
||||
SSkmInfo skmTable;
|
||||
SSkmInfo skmRow;
|
||||
SBlockData bData;
|
||||
|
@ -48,13 +48,89 @@ static int32_t tsdbRowIsDeleted(SCommitter *pCommitter, TSDBROW *pRow) {
|
|||
|
||||
static int32_t tsdbCommitTimeSeriesData(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
int32_t lino;
|
||||
|
||||
// TODO
|
||||
SMemTable *pMem = pCommitter->pTsdb->imem;
|
||||
|
||||
if (pMem->nRow == 0) goto _exit;
|
||||
|
||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||
|
||||
// TODO: prepare commit next table
|
||||
|
||||
STbDataIter iter;
|
||||
TSDBKEY from = {.ts = pCommitter->minKey, .version = VERSION_MIN};
|
||||
tsdbTbDataIterOpen(pTbData, &from, 0, &iter);
|
||||
|
||||
for (TSDBROW *pRow; (pRow = tsdbTbDataIterGet(&iter)) != NULL; tsdbTbDataIterNext(&iter)) {
|
||||
TSDBKEY rowKey = TSDBROW_KEY(pRow);
|
||||
|
||||
if (rowKey.ts > pCommitter->maxKey) {
|
||||
pCommitter->nextKey = TMIN(rowKey.ts, pCommitter->nextKey);
|
||||
break;
|
||||
}
|
||||
|
||||
if (pRow->type == TSDBROW_ROW_FMT) {
|
||||
// code = tsdbUpdateSkmInfo(&pCommitter->skmRow, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
code = tBlockDataAppendRow(&pCommitter->bData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (pCommitter->bData.nRow >= pCommitter->maxRow) {
|
||||
// code = tsdbWriteSttBlock(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
tBlockDataClear(&pCommitter->bData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d nRow:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
|
||||
pMem->nRow);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitTombstoneData(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino;
|
||||
|
||||
SMemTable *pMem = pCommitter->pTsdb->imem;
|
||||
|
||||
if (pMem->nDel == 0) goto _exit;
|
||||
|
||||
for (int32_t iTbData = 0; iTbData < taosArrayGetSize(pCommitter->aTbDataP); iTbData++) {
|
||||
STbData *pTbData = (STbData *)taosArrayGetP(pCommitter->aTbDataP, iTbData);
|
||||
|
||||
if (pTbData->pHead == NULL) continue;
|
||||
|
||||
for (SDelData *pDelData = pTbData->pHead; pDelData; pDelData = pDelData->pNext) {
|
||||
if (pDelData->sKey > pCommitter->maxKey || pDelData->eKey < pCommitter->minKey) continue;
|
||||
|
||||
// code = tsdbAppendDelData(pCommitter, pTbData->suid, pTbData->uid, TMAX(pDelData->sKey, pCommitter->minKey),
|
||||
// TMIN(pDelData->eKey, pCommitter->maxKey), pDelData->version);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
if (/* TODO */ 0 > pCommitter->maxRow) {
|
||||
// code = tsdbWriteDelBlock(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||
} else {
|
||||
tsdbDebug("vgId:%d %s done, fid:%d nDel:%" PRId64, TD_VID(pCommitter->pTsdb->pVnode), __func__, pCommitter->fid,
|
||||
pMem->nDel);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -72,19 +148,62 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitNextFSet(SCommitter *pCommitter, int8_t *done) {
|
||||
static int32_t tsdbCommitFSetStart(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
pCommitter->fid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
|
||||
tsdbFidKeyRange(pCommitter->fid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
|
||||
&pCommitter->maxKey);
|
||||
pCommitter->expLevel = tsdbFidLevel(pCommitter->fid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
|
||||
#if 0
|
||||
// pCommitter->cid = tsdbFileSetNextCid(STsdb * pTsdb, pCommitter->fid);
|
||||
#else
|
||||
pCommitter->cid = 0;
|
||||
#endif
|
||||
|
||||
// TODO
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitFSetEnd(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
// TODO
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d failed at line %d since %s", TD_VID(pCommitter->pTsdb->pVnode), lino, tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbCommitNextFSet(SCommitter *pCommitter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
STsdb *pTsdb = pCommitter->pTsdb;
|
||||
|
||||
// fset commit start (TODO)
|
||||
// fset commit start
|
||||
code = tsdbCommitFSetStart(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// commit fset
|
||||
code = tsdbCommitTimeSeriesData(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// fset commit end (TODO)
|
||||
code = tsdbCommitTombstoneData(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
// fset commit end
|
||||
code = tsdbCommitFSetEnd(pCommitter);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
|
@ -140,12 +259,11 @@ int32_t tsdbCommitBegin(STsdb *pTsdb, SCommitInfo *pInfo) {
|
|||
tsdbUnrefMemTable(pMem, NULL, true);
|
||||
} else {
|
||||
SCommitter committer;
|
||||
int8_t done = 0;
|
||||
|
||||
code = tsdbCommitterOpen(pTsdb, pInfo, &committer);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
while (!done && (code = tsdbCommitNextFSet(&committer, &done))) {
|
||||
while (committer.nextKey != TSKEY_MAX && (code = tsdbCommitNextFSet(&committer))) {
|
||||
}
|
||||
|
||||
code = tsdbCommitterClose(&committer, code);
|
|
@ -0,0 +1,63 @@
|
|||
#include "tsdb.h"
|
||||
|
||||
typedef struct SSttFWriter SSttFWriter;
|
||||
typedef struct SSttFReader SSttFReader;
|
||||
|
||||
extern int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD);
|
||||
extern void tsdbCloseFile(STsdbFD **ppFD);
|
||||
struct SSttFWriter {
|
||||
STsdb *pTsdb;
|
||||
STsdbFD *pFd;
|
||||
SSttFile file;
|
||||
};
|
||||
|
||||
int32_t tsdbSttFWriterOpen(STsdb *pTsdb, SSttFile *pSttFile, SSttFWriter **ppWritter) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
int32_t szPage = pTsdb->pVnode->config.tsdbPageSize;
|
||||
int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; // TODO
|
||||
|
||||
ppWritter[0] = taosMemoryCalloc(1, sizeof(SSttFWriter));
|
||||
if (ppWritter[0] == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
ppWritter[0]->pTsdb = pTsdb;
|
||||
ppWritter[0]->file = pSttFile[0];
|
||||
|
||||
code = tsdbOpenFile(NULL, szPage, flag, &ppWritter[0]->pFd);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbSttFWriterClose(SSttFWriter *pWritter) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteSttBlockData(SSttFWriter *pWritter, SBlockData *pBlockData, SSttBlk *pSttBlk) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteSttBlockIdx(SSttFWriter *pWriter, SArray *aSttBlk) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteSttDelData(SSttFWriter *pWriter) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbWriteSttDelIdx(SSttFWriter *pWriter) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
#include "tsdb.h"
|
||||
|
||||
// =============== PAGE-WISE FILE ===============
|
||||
static int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t tsdbOpenFile(const char *path, int32_t szPage, int32_t flag, STsdbFD **ppFD) {
|
||||
int32_t code = 0;
|
||||
STsdbFD *pFD = NULL;
|
||||
|
||||
|
@ -68,7 +68,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static void tsdbCloseFile(STsdbFD **ppFD) {
|
||||
void tsdbCloseFile(STsdbFD **ppFD) {
|
||||
STsdbFD *pFD = *ppFD;
|
||||
if (pFD) {
|
||||
taosMemoryFree(pFD->pBuf);
|
||||
|
|
Loading…
Reference in New Issue