more code

This commit is contained in:
Hongze Cheng 2022-08-28 00:00:48 +08:00
parent d31459dc08
commit 92ca817b99
1 changed files with 132 additions and 50 deletions

View File

@ -559,18 +559,14 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitDataBlock(SCommitter *pCommitter, SBlock *pBlock) { static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData; SBlockData *pBlockData = &pCommitter->dWriter.bData;
SBlock block; SBlock block;
ASSERT(pBlockData->nRow > 0); ASSERT(pBlockData->nRow > 0);
if (pBlock) { tBlockReset(&block);
block = *pBlock; // as a subblock
} else {
tBlockReset(&block); // as a new block
}
// info // info
block.nRow += pBlockData->nRow; block.nRow += pBlockData->nRow;
@ -1547,14 +1543,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) {
} }
} }
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { if (pBlockData->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter, NULL); code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBlockData->nRow) { if (pBlockData->nRow) {
code = tsdbCommitDataBlock(pCommitter, NULL); code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
@ -1616,8 +1612,8 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) {
ASSERT(0); ASSERT(0);
} }
if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter, NULL); code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
} }
@ -1633,14 +1629,14 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) {
pRow = NULL; pRow = NULL;
} }
if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { if (pBDataW->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter, NULL); code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
} }
if (pBDataW->nRow) { if (pBDataW->nRow) {
code = tsdbCommitDataBlock(pCommitter, NULL); code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
@ -1724,53 +1720,37 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) { SBlockData *pBData = &pCommitter->dWriter.bData;
pRowInfo = NULL; SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
}
if (pRowInfo == NULL) goto _err; if (pBDatal->suid || pBDatal->uid) {
if (pBDatal->suid != pBData->suid || pBDatal->suid == 0) {
#if 0 if (pBDatal->nRow) {
if (pBlockData->suid || pBlockData->uid) {
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
if (pBlockData->nRow > 0) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
} }
tBlockDataReset(pBDatal);
tBlockDataReset(pBlockData);
} }
} }
if (!pBlockData->suid && !pBlockData->uid) { if (!pBDatal->suid && !pBDatal->uid) {
code = tBlockDataInit(pBlockData, pTbData->suid, 0, pCommitter->skmTable.pTSchema); ASSERT(pCommitter->skmTable.suid == pBData->suid);
ASSERT(pCommitter->skmTable.uid == pBData->uid);
code = tBlockDataInit(pBDatal, pBData->suid, 0, pCommitter->skmTable.pTSchema);
if (code) goto _err; if (code) goto _err;
} }
#endif
SBlockData *pBlockData = NULL; // TODO for (int32_t iRow = 0; iRow < pBData->nRow; iRow++) {
while (pRowInfo) { TSDBROW row = tsdbRowFromBlockData(pBData, iRow);
STSchema *pTSchema = NULL; code = tBlockDataAppendRow(pBDatal, &row, NULL, pBData->uid);
if (pRowInfo->row.type == 0) { if (code) goto _err;
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;
pTSchema = pCommitter->skmRow.pTSchema;
}
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pTSchema, id.uid);
if (code) goto _err;
code = tsdbNextCommitRow(pCommitter);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
} }
} }
@ -1780,6 +1760,109 @@ _err:
return code; return code;
} }
static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) {
int32_t code = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
if (pRowInfo == NULL) goto _exit;
if (pCommitter->toLastOnly) {
SBlockData *pBDatal = &pCommitter->dWriter.bDatal;
if (pBDatal->suid || pBDatal->uid) {
if (pBDatal->suid != id.suid || pBDatal->suid == 0) {
if (pBDatal->nRow) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
tBlockDataReset(pBDatal);
}
}
if (!pBDatal->suid && !pBDatal->uid) {
ASSERT(pCommitter->skmTable.suid == id.suid);
ASSERT(pCommitter->skmTable.uid == id.uid);
code = tBlockDataInit(pBDatal, id.suid, 0, pCommitter->skmTable.pTSchema);
if (code) goto _err;
}
while (pRowInfo) {
STSchema *pTSchema = NULL;
if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err;
pTSchema = pCommitter->skmRow.pTSchema;
}
code = tBlockDataAppendRow(pBDatal, &pRowInfo->row, pTSchema, id.uid);
if (code) goto _err;
code = tsdbNextCommitRow(pCommitter);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
if (pBDatal->nRow >= pCommitter->maxRow) {
code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err;
}
}
} else {
SBlockData *pBData = &pCommitter->dWriter.bData;
ASSERT(pBData->nRow == 0);
while (pRowInfo) {
STSchema *pTSchema = NULL;
if (pRowInfo->row.type == 0) {
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err;
pTSchema = pCommitter->skmRow.pTSchema;
}
code = tBlockDataAppendRow(pBData, &pRowInfo->row, pTSchema, id.uid);
if (code) goto _err;
code = tsdbNextCommitRow(pCommitter);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo && (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid)) {
pRowInfo = NULL;
}
if (pBData->nRow >= pCommitter->maxRow) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
}
}
if (pBData->nRow) {
if (pBData->nRow > pCommitter->minRow) {
code = tsdbCommitDataBlock(pCommitter);
if (code) goto _err;
} else {
code = tsdbAppendLastBlock(pCommitter);
if (code) goto _err;
}
}
}
_exit:
return code;
_err:
tsdbError("vgId:%d tsdb commit table data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
int32_t code = 0; int32_t code = 0;
@ -1830,7 +1913,6 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
code = tsdbMoveCommitData(pCommitter, id); code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err; if (code) goto _err;
// TODO: here may have problem
if (pCommitter->dWriter.bDatal.nRow > 0) { if (pCommitter->dWriter.bDatal.nRow > 0) {
code = tsdbCommitLastBlock(pCommitter); code = tsdbCommitLastBlock(pCommitter);
if (code) goto _err; if (code) goto _err;