more code

This commit is contained in:
Hongze Cheng 2022-08-26 17:47:37 +08:00
parent daf5d42394
commit 712bc9a0ba
1 changed files with 145 additions and 36 deletions

View File

@ -656,7 +656,8 @@ _err:
}
static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) {
int32_t code = 0;
int32_t code = 0;
#if 0
STbData *pTbData = pIter->pTbData;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
@ -699,6 +700,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
_err:
tsdbError("vgId:%d, tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
#endif
return code;
}
@ -1346,12 +1348,11 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) {
return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row);
}
static SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
// TODO
return NULL;
static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) {
return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL;
}
static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) {
static int32_t tsdbNextCommitRow(SCommitter *pCommitter) {
int32_t code = 0;
if (pCommitter->pIter) {
@ -1429,25 +1430,139 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) {
}
}
if (pCommitter->pIter) {
*ppInfo = &pCommitter->pIter->r;
} else {
*ppInfo = NULL;
}
_exit:
return code;
}
static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) {
int32_t code = 0;
// TODO
int32_t code = 0;
SBlockData *pBlockData = &pCommitter->dWriter.bData;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
tBlockDataClear(pBlockData);
while (pRowInfo) {
ASSERT(pRowInfo->row.type == 0);
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err;
code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
if (code) goto _err;
code = tsdbNextCommitRow(pCommitter);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) {
if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
if (tsdbKeyCmprFn(&tKey, &pBlock->minKey) >= 0) pRowInfo = NULL;
}
}
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
if (pBlockData->nRow) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) {
int32_t code = 0;
// TODO
int32_t code = 0;
SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter);
TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid};
SBlockData *pBDataR = &pCommitter->dReader.bData;
SBlockData *pBDataW = &pCommitter->dWriter.bData;
code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBDataR);
if (code) goto _err;
tBlockDataClear(pBDataW);
int32_t iRow = 0;
TSDBROW row = tsdbRowFromBlockData(pBDataR, 0);
TSDBROW *pRow = &row;
while (pRow && pRowInfo) {
int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row);
if (c < 0) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
if (code) goto _err;
iRow++;
if (iRow < pBDataR->nRow) {
row = tsdbRowFromBlockData(pBDataR, iRow);
} else {
pRow = NULL;
}
} else if (c > 0) {
ASSERT(pRowInfo->row.type == 0);
code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row));
if (code) goto _err;
code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid);
if (code) goto _err;
code = tsdbNextCommitRow(pCommitter);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo) {
if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) {
pRowInfo = NULL;
} else {
TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row);
if (tsdbKeyCmprFn(&tKey, &pBlock->maxKey) > 0) pRowInfo = NULL;
}
}
} else {
ASSERT(0);
}
if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
while (pRow) {
code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid);
if (code) goto _err;
iRow++;
if (iRow < pBDataR->nRow) {
row = tsdbRowFromBlockData(pBDataR, iRow);
} else {
pRow = NULL;
}
if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
}
if (pBDataW->nRow) {
code = tsdbCommitDataBlock(pCommitter, NULL);
if (code) goto _err;
}
return code;
_err:
tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
return code;
}
@ -1533,12 +1648,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
SRowInfo *pRowInfo = NULL;
TABLEID id = {0};
while (true) {
code = tsdbNextCommitRow(pCommitter, &pRowInfo);
if (code) goto _err;
pRowInfo = tsdbGetCommitRow(pCommitter);
if (pRowInfo == NULL) {
/* end current table data commit (todo) */
/* end remain table data commit*/
code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX});
if (code) goto _err;
@ -1551,23 +1662,21 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) {
break;
}
if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) {
/* end current table data commit (todo) */
ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid);
/* start new table data commit */
id.suid = pRowInfo->suid;
id.uid = pRowInfo->uid;
// reader
code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err;
// writer
tMapDataReset(&pCommitter->dWriter.mBlock);
// other
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema);
if (code) goto _err;
}
/* start new table data commit */
id.suid = pRowInfo->suid;
id.uid = pRowInfo->uid;
// reader
code = tsdbMoveCommitData(pCommitter, id);
if (code) goto _err;
// writer
tMapDataReset(&pCommitter->dWriter.mBlock);
// other
code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid);
if (code) goto _err;
code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema);
if (code) goto _err;
/* merge with data in .data file */
code = tsdbMergeTableData(pCommitter, id);