more code

This commit is contained in:
Hongze Cheng 2022-12-27 15:19:35 +08:00
parent f0dc9a82e6
commit 3b3b8d4f9a
1 changed files with 54 additions and 22 deletions

View File

@ -54,8 +54,10 @@ typedef struct STsdbDataIter {
typedef struct { typedef struct {
STsdb *pTsdb; STsdb *pTsdb;
STsdbFS fs;
int64_t cid; int64_t cid;
int32_t maxRows;
int32_t minRows;
STsdbFS fs;
int32_t fid; int32_t fid;
SDFileSet *pDFileSet; SDFileSet *pDFileSet;
SDataFReader *pReader; SDataFReader *pReader;
@ -250,12 +252,18 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) {
int32_t lino = 0; int32_t lino = 0;
pCompactor->pTsdb = pTsdb; pCompactor->pTsdb = pTsdb;
// pCompactor->cid = 0; (TODO)
pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows;
pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows;
code = tsdbFSCopy(pTsdb, &pCompactor->fs); code = tsdbFSCopy(pTsdb, &pCompactor->fs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
pCompactor->fid = INT32_MIN; pCompactor->fid = INT32_MIN;
code = tBlockDataCreate(&pCompactor->bData);
TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
@ -360,7 +368,7 @@ _exit:
return code; return code;
} }
static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) { static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -370,9 +378,11 @@ static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) {
} }
if (pCompactor->pIter) { if (pCompactor->pIter) {
*ppRow = &pCompactor->pIter->rowInfo.row; *ppRowInfo = &pCompactor->pIter->rowInfo;
*ppTSchema = NULL; // TODO
} else { } else {
*ppRow = NULL; *ppRowInfo = NULL;
*ppTSchema = NULL;
} }
_exit: _exit:
@ -421,6 +431,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) {
} }
} }
pCompactor->pIter = NULL; pCompactor->pIter = NULL;
tBlockDataReset(&pCompactor->bData);
_exit: _exit:
if (code) { if (code) {
@ -454,49 +465,70 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) {
// Check if can do compact (TODO) // Check if can do compact (TODO)
// Do compact // Do compact
STsdbCompactor compactor = {0}; STsdbCompactor *pCompactor = &(STsdbCompactor){0};
code = tsdbBeginCompact(pTsdb, &compactor); code = tsdbBeginCompact(pTsdb, pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
while (true) { while (true) {
code = tsdbOpenCompactor(&compactor); code = tsdbOpenCompactor(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (compactor.pDFileSet == NULL) break; if (pCompactor->pDFileSet == NULL) break;
// loop to merge row by row // loop to merge row by row
TSDBROW *pRow = NULL; SRowInfo *pRowInfo = NULL;
STSchema *pTSchema = NULL;
int64_t nRow = 0; int64_t nRow = 0;
for (;;) { for (;;) {
code = tsdbCompactGetRow(&compactor, &pRow); code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
if (pRow == NULL) break; if (pRowInfo == NULL) break;
nRow++; nRow++;
code = tsdbCompactNextRow(&compactor); // write block data if schema changed
TSDB_CHECK_CODE(code, lino, _exit); if ((pCompactor->bData.suid || pCompactor->bData.uid) &&
!TABLE_SAME_SCHEMA(pCompactor->bData.suid, pCompactor->bData.uid, pRowInfo->suid, pRowInfo->uid)) {
// TODO: write block data
ASSERT(0);
// code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0); // set block data not initialized
// TSDB_CHECK_CODE(code, lino, _exit); tBlockDataReset(&pCompactor->bData);
// if (compactor.bData.nRows >= TSDB_MAX_ROWS_PER_BLOCK) {
// code = tsdbFlushBlock(&compactor);
// TSDB_CHECK_CODE(code, lino, _exit);
// }
} }
tsdbCloseCompactor(&compactor); // init the block data if not initialized yet
if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) {
code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema,
NULL, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
// append row to block data
code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid);
TSDB_CHECK_CODE(code, lino, _exit);
// check if block data is full
if (pCompactor->bData.nRow >= pCompactor->maxRows) {
// TODO: write block data
ASSERT(0);
}
// iterate to next row
code = tsdbCompactNextRow(pCompactor);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdbCloseCompactor(pCompactor);
} }
_exit: _exit:
if (code) { if (code) {
tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
tsdbAbortCompact(&compactor); tsdbAbortCompact(pCompactor);
} else { } else {
tsdbCommitCompact(&compactor); tsdbCommitCompact(pCompactor);
} }
return code; return code;
} }