more work

This commit is contained in:
Hongze Cheng 2022-06-13 13:13:37 +00:00
parent 6211f28102
commit b2c147be72
2 changed files with 34 additions and 8 deletions

View File

@ -27,6 +27,8 @@ struct SCommitter {
/* commit data */
int32_t minutes;
int8_t precision;
int32_t minRow;
int32_t maxRow;
TSKEY nextCommitKey;
// commit file data
int32_t commitFid;
@ -580,18 +582,14 @@ static int32_t tsdbCommitTableDataStart(SCommitter *pCommitter) {
int32_t code = 0;
// old
pCommitter->oBlock.flag = 0;
pCommitter->oBlock.nItem = 0;
pCommitter->oBlock.nData = 0;
tMapDataReset(&pCommitter->oBlock);
if (pCommitter->pBlockIdx) {
code = tsdbReadBlock(pCommitter->pReader, &pCommitter->oBlock, NULL);
if (code) goto _err;
}
// new
pCommitter->nBlock.flag = 0;
pCommitter->nBlock.nItem = 0;
pCommitter->nBlock.nData = 0;
tMapDataReset(&pCommitter->nBlock);
_err:
return code;
@ -599,6 +597,7 @@ _err:
static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
int32_t code = 0;
STsdb *pTsdb = pCommitter->pTsdb;
STbDataIter *pIter = NULL;
int32_t iBlock = 0;
int32_t nBlock = pCommitter->nBlock.nItem;
@ -606,19 +605,42 @@ static int32_t tsdbCommitTableDataImpl(SCommitter *pCommitter) {
SBlock block;
TSDBROW *pRow;
TSDBROW row;
int32_t iRow = 0;
STSchema *pTSchema = NULL;
if (pCommitter->pTbData) {
code = tsdbTbDataIterCreate(pCommitter->pTbData, NULL, 0, &pIter);
code = tsdbTbDataIterCreate(pCommitter->pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, &pIter);
if (code) goto _err;
}
// merge loop
for (;;) {
/* code */
tsdbTbDataIterGet(pIter, pRow);
code = tsdbColDataBlockAppend(&pCommitter->nColDataBlock, pRow, pTSchema);
if (code) goto _err;
if (pCommitter->nColDataBlock.nRow >= pCommitter->maxRow) {
code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL);
if (code) goto _err;
tsdbColDataBlockReset(&pCommitter->nColDataBlock);
}
if (!tsdbTbDataIterNext(pIter)) break;
}
if (pCommitter->nColDataBlock.nRow) {
code = tsdbWriteColDataBlock(pCommitter->pWriter, &pCommitter->nColDataBlock, NULL);
if (code) goto _err;
}
tsdbTbDataIterDestroy(pIter);
return code;
_err:
tsdbError("vgId:%d commit table data impl failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
tsdbTbDataIterDestroy(pIter);
return code;
}

View File

@ -732,6 +732,10 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal *
}
// SColDataBlock ======================================================
void tsdbColDataBlockReset(SColDataBlock *pColDataBlock) {
// TODO
}
int32_t tsdbColDataBlockAppend(SColDataBlock *pColDataBlock, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0;
int32_t nRow = pColDataBlock->nRow;