more code
This commit is contained in:
parent
f43bae0df0
commit
b977626a78
|
@ -778,46 +778,18 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
|
static int32_t tsdbCommitLastFile(SCommitter *pCommitter, STbDataIter *pIter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
#if 0
|
|
||||||
STbData *pTbData = pIter->pTbData;
|
STbData *pTbData = pIter->pTbData;
|
||||||
int32_t nRow = tsdbGetNumOfRowsLessThan(pIter, (TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN});
|
SBlockData *pBlockData = &pCommitter->dWriter.bDatal;
|
||||||
|
|
||||||
if (pCommitter->dReader.pRowInfo && tTABLEIDCmprFn(pTbData, pCommitter->dReader.pRowInfo) == 0) {
|
|
||||||
if (pCommitter->dReader.pRowInfo->suid) { // super table
|
|
||||||
for (int32_t iRow = pCommitter->dReader.iRow; iRow < pCommitter->dReader.bDatal.nRow; iRow++) {
|
|
||||||
if (pTbData->uid != pCommitter->dReader.bDatal.aUid[iRow]) break;
|
|
||||||
nRow++;
|
|
||||||
}
|
|
||||||
} else { // normal table
|
|
||||||
ASSERT(pCommitter->dReader.iRow == 0);
|
|
||||||
nRow += pCommitter->dReader.bDatal.nRow;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nRow == 0) goto _exit;
|
|
||||||
|
|
||||||
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
|
TSDBROW *pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
|
||||||
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
|
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
|
||||||
pRow = NULL;
|
pRow = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRowInfo *pRowInfo = pCommitter->dReader.pRowInfo;
|
if (pRow == NULL) goto _exit;
|
||||||
if (pRowInfo && pRowInfo->uid != pTbData->uid) {
|
|
||||||
pRowInfo = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
while (nRow) {
|
|
||||||
SBlockData *pBlockData;
|
|
||||||
int8_t toData;
|
|
||||||
|
|
||||||
if (nRow < pCommitter->minRow) { // to .last
|
|
||||||
toData = 0;
|
|
||||||
pBlockData = &pCommitter->dWriter.bDatal;
|
|
||||||
|
|
||||||
// commit and reset block data schema if need
|
|
||||||
// QUESTION: Is there a case that pBlockData->nRow == 0 but need to change schema ?
|
|
||||||
if (pBlockData->suid || pBlockData->uid) {
|
if (pBlockData->suid || pBlockData->uid) {
|
||||||
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
|
if (pBlockData->suid != pTbData->suid || pBlockData->suid == 0) {
|
||||||
if (pBlockData->nRow > 0) {
|
if (pBlockData->nRow > 0) {
|
||||||
|
@ -829,62 +801,11 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// set block data schema if need
|
if (!pBlockData->suid && !pBlockData->uid) {
|
||||||
if (pBlockData->suid == 0 && pBlockData->uid == 0) {
|
code = tBlockDataInit(pBlockData, pTbData->suid, pTbData->uid, pCommitter->skmTable.pTSchema);
|
||||||
code =
|
|
||||||
tBlockDataInit(pBlockData, pTbData->suid, pTbData->suid ? 0 : pTbData->uid, pCommitter->skmTable.pTSchema);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlockData->nRow + nRow > pCommitter->maxRow) {
|
|
||||||
code = tsdbCommitLastBlock(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
}
|
|
||||||
} else { // to .data
|
|
||||||
toData = 1;
|
|
||||||
pBlockData = &pCommitter->dWriter.bData;
|
|
||||||
ASSERT(pBlockData->nRow == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
while (pRow && pRowInfo) {
|
|
||||||
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
|
|
||||||
if (c < 0) {
|
|
||||||
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->skmRow.pTSchema, pTbData->uid);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
tsdbTbDataIterNext(pIter);
|
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
|
||||||
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) {
|
|
||||||
pRow = NULL;
|
|
||||||
}
|
|
||||||
} else if (c > 0) {
|
|
||||||
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbCommitterNextLastRow(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
pRowInfo = pCommitter->dReader.pRowInfo;
|
|
||||||
if (pRowInfo && pRowInfo->uid != pTbData->uid) {
|
|
||||||
pRowInfo = NULL;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
nRow--;
|
|
||||||
if (toData) {
|
|
||||||
if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
|
|
||||||
code = tsdbCommitDataBlock(pCommitter, NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
goto _outer_break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
while (pRow) {
|
while (pRow) {
|
||||||
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
|
code = tsdbCommitterUpdateRowSchema(pCommitter, pTbData->suid, pTbData->uid, TSDBROW_SVERSION(pRow));
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
@ -898,48 +819,17 @@ static int32_t tsdbMergeCommitLast(SCommitter *pCommitter, STbDataIter *pIter) {
|
||||||
pRow = NULL;
|
pRow = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
nRow--;
|
if (pBlockData->nRow >= pCommitter->maxRow) {
|
||||||
if (toData) {
|
code = tsdbCommitLastBlock(pCommitter);
|
||||||
if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
|
|
||||||
code = tsdbCommitDataBlock(pCommitter, NULL);
|
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
goto _outer_break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
while (pRowInfo) {
|
|
||||||
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pTbData->uid);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
code = tsdbCommitterNextLastRow(pCommitter);
|
|
||||||
if (code) goto _err;
|
|
||||||
|
|
||||||
pRowInfo = pCommitter->dReader.pRowInfo;
|
|
||||||
if (pRowInfo && pRowInfo->uid != pTbData->uid) {
|
|
||||||
pRowInfo = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
nRow--;
|
|
||||||
if (toData) {
|
|
||||||
if (nRow == 0 || pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
|
|
||||||
code = tsdbCommitDataBlock(pCommitter, NULL);
|
|
||||||
if (code) goto _err;
|
|
||||||
goto _outer_break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
_outer_break:
|
|
||||||
ASSERT(nRow >= 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return code;
|
return code;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
tsdbError("vgId:%d tsdb merge commit last failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
#endif
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1061,7 +951,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData) {
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
// commit last
|
// commit last
|
||||||
code = tsdbMergeCommitLast(pCommitter, &iter);
|
code = tsdbCommitLastFile(pCommitter, &iter);
|
||||||
if (code) goto _err;
|
if (code) goto _err;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
|
Loading…
Reference in New Issue