diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index 9f3d84d80e..55592a59c4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -54,8 +54,10 @@ typedef struct STsdbDataIter { typedef struct { STsdb *pTsdb; - STsdbFS fs; int64_t cid; + int32_t maxRows; + int32_t minRows; + STsdbFS fs; int32_t fid; SDFileSet *pDFileSet; SDataFReader *pReader; @@ -250,12 +252,18 @@ static int32_t tsdbBeginCompact(STsdb *pTsdb, STsdbCompactor *pCompactor) { int32_t lino = 0; pCompactor->pTsdb = pTsdb; + // pCompactor->cid = 0; (TODO) + pCompactor->maxRows = pTsdb->pVnode->config.tsdbCfg.maxRows; + pCompactor->minRows = pTsdb->pVnode->config.tsdbCfg.minRows; code = tsdbFSCopy(pTsdb, &pCompactor->fs); TSDB_CHECK_CODE(code, lino, _exit); pCompactor->fid = INT32_MIN; + code = tBlockDataCreate(&pCompactor->bData); + TSDB_CHECK_CODE(code, lino, _exit); + _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); @@ -360,7 +368,7 @@ _exit: return code; } -static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) { +static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, SRowInfo **ppRowInfo, STSchema **ppTSchema) { int32_t code = 0; int32_t lino = 0; @@ -370,9 +378,11 @@ static int32_t tsdbCompactGetRow(STsdbCompactor *pCompactor, TSDBROW **ppRow) { } if (pCompactor->pIter) { - *ppRow = &pCompactor->pIter->rowInfo.row; + *ppRowInfo = &pCompactor->pIter->rowInfo; + *ppTSchema = NULL; // TODO } else { - *ppRow = NULL; + *ppRowInfo = NULL; + *ppTSchema = NULL; } _exit: @@ -421,6 +431,7 @@ static int32_t tsdbOpenCompactor(STsdbCompactor *pCompactor) { } } pCompactor->pIter = NULL; + tBlockDataReset(&pCompactor->bData); _exit: if (code) { @@ -454,49 +465,70 @@ int32_t tsdbCompact(STsdb *pTsdb, int32_t flag) { // Check if can do compact (TODO) // Do compact - STsdbCompactor compactor = {0}; + STsdbCompactor *pCompactor = &(STsdbCompactor){0}; - code = tsdbBeginCompact(pTsdb, &compactor); + code = tsdbBeginCompact(pTsdb, pCompactor); TSDB_CHECK_CODE(code, lino, _exit); while (true) { - code = tsdbOpenCompactor(&compactor); + code = tsdbOpenCompactor(pCompactor); TSDB_CHECK_CODE(code, lino, _exit); - if (compactor.pDFileSet == NULL) break; + if (pCompactor->pDFileSet == NULL) break; // loop to merge row by row - TSDBROW *pRow = NULL; - int64_t nRow = 0; + SRowInfo *pRowInfo = NULL; + STSchema *pTSchema = NULL; + int64_t nRow = 0; for (;;) { - code = tsdbCompactGetRow(&compactor, &pRow); + code = tsdbCompactGetRow(pCompactor, &pRowInfo, &pTSchema); TSDB_CHECK_CODE(code, lino, _exit); - if (pRow == NULL) break; + if (pRowInfo == NULL) break; nRow++; - code = tsdbCompactNextRow(&compactor); + // write block data if schema changed + if ((pCompactor->bData.suid || pCompactor->bData.uid) && + !TABLE_SAME_SCHEMA(pCompactor->bData.suid, pCompactor->bData.uid, pRowInfo->suid, pRowInfo->uid)) { + // TODO: write block data + ASSERT(0); + + // set block data not initialized + tBlockDataReset(&pCompactor->bData); + } + + // init the block data if not initialized yet + if (pCompactor->bData.suid == 0 && pCompactor->bData.uid == 0) { + code = tBlockDataInit(&pCompactor->bData, &(TABLEID){.suid = pRowInfo->suid, .uid = pRowInfo->uid}, pTSchema, + NULL, 0); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // append row to block data + code = tBlockDataAppendRow(&pCompactor->bData, &pRowInfo->row, pTSchema, pRowInfo->uid); TSDB_CHECK_CODE(code, lino, _exit); - // code = tBlockDataAppendRow(&compactor.bData, pRow, pRow, NULL, 0); - // TSDB_CHECK_CODE(code, lino, _exit); + // check if block data is full + if (pCompactor->bData.nRow >= pCompactor->maxRows) { + // TODO: write block data + ASSERT(0); + } - // if (compactor.bData.nRows >= TSDB_MAX_ROWS_PER_BLOCK) { - // code = tsdbFlushBlock(&compactor); - // TSDB_CHECK_CODE(code, lino, _exit); - // } + // iterate to next row + code = tsdbCompactNextRow(pCompactor); + TSDB_CHECK_CODE(code, lino, _exit); } - tsdbCloseCompactor(&compactor); + tsdbCloseCompactor(pCompactor); } _exit: if (code) { tsdbError("vgId:%d %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code)); - tsdbAbortCompact(&compactor); + tsdbAbortCompact(pCompactor); } else { - tsdbCommitCompact(&compactor); + tsdbCommitCompact(pCompactor); } return code; }