diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 9af25338e5..d0b1ca2bd9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -656,7 +656,8 @@ _err: } static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter, TSDBKEY toKey) { - int32_t code = 0; + int32_t code = 0; +#if 0 STbData *pTbData = pIter->pTbData; SBlockData *pBlockData = &pCommitter->dWriter.bData; @@ -699,6 +700,7 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter _err: tsdbError("vgId:%d, tsdb commit table mem data failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); +#endif return code; } @@ -1346,12 +1348,11 @@ static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { return tsdbRowCmprFn(&pInfo1->row, &pInfo2->row); } -static SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { - // TODO - return NULL; +static FORCE_INLINE SRowInfo *tsdbGetCommitRow(SCommitter *pCommitter) { + return (pCommitter->pIter) ? &pCommitter->pIter->r : NULL; } -static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) { +static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { int32_t code = 0; if (pCommitter->pIter) { @@ -1429,25 +1430,139 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter, SRowInfo **ppInfo) { } } - if (pCommitter->pIter) { - *ppInfo = &pCommitter->pIter->r; - } else { - *ppInfo = NULL; - } - _exit: return code; } static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SBlock *pBlock) { - int32_t code = 0; - // TODO + int32_t code = 0; + SBlockData *pBlockData = &pCommitter->dWriter.bData; + SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); + TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; + + tBlockDataClear(pBlockData); + while (pRowInfo) { + ASSERT(pRowInfo->row.type == 0); + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); + if (code) goto _err; + + code = tsdbNextCommitRow(pCommitter); + if (code) goto _err; + + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo) { + if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { + pRowInfo = NULL; + } else { + TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); + if (tsdbKeyCmprFn(&tKey, &pBlock->minKey) >= 0) pRowInfo = NULL; + } + } + + if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + } + + if (pBlockData->nRow) { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb commit ahead block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SBlock *pBlock) { - int32_t code = 0; - // TODO + int32_t code = 0; + SRowInfo *pRowInfo = tsdbGetCommitRow(pCommitter); + TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; + SBlockData *pBDataR = &pCommitter->dReader.bData; + SBlockData *pBDataW = &pCommitter->dWriter.bData; + + code = tsdbReadDataBlock(pCommitter->dReader.pReader, pBlock, pBDataR); + if (code) goto _err; + + tBlockDataClear(pBDataW); + int32_t iRow = 0; + TSDBROW row = tsdbRowFromBlockData(pBDataR, 0); + TSDBROW *pRow = &row; + + while (pRow && pRowInfo) { + int32_t c = tsdbRowCmprFn(pRow, &pRowInfo->row); + if (c < 0) { + code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); + if (code) goto _err; + + iRow++; + if (iRow < pBDataR->nRow) { + row = tsdbRowFromBlockData(pBDataR, iRow); + } else { + pRow = NULL; + } + } else if (c > 0) { + ASSERT(pRowInfo->row.type == 0); + code = tsdbCommitterUpdateRowSchema(pCommitter, id.suid, id.uid, TSDBROW_SVERSION(&pRowInfo->row)); + if (code) goto _err; + + code = tBlockDataAppendRow(pBDataW, &pRowInfo->row, pCommitter->skmRow.pTSchema, id.uid); + if (code) goto _err; + + code = tsdbNextCommitRow(pCommitter); + if (code) goto _err; + + pRowInfo = tsdbGetCommitRow(pCommitter); + if (pRowInfo) { + if (pRowInfo->suid != id.suid || pRowInfo->uid != id.uid) { + pRowInfo = NULL; + } else { + TSDBKEY tKey = TSDBROW_KEY(&pRowInfo->row); + if (tsdbKeyCmprFn(&tKey, &pBlock->maxKey) > 0) pRowInfo = NULL; + } + } + } else { + ASSERT(0); + } + + if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + } + + while (pRow) { + code = tBlockDataAppendRow(pBDataW, pRow, NULL, id.uid); + if (code) goto _err; + + iRow++; + if (iRow < pBDataR->nRow) { + row = tsdbRowFromBlockData(pBDataR, iRow); + } else { + pRow = NULL; + } + + if (pBDataW->nRow >= pCommitter->maxRow * 4 / 5) { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + } + + if (pBDataW->nRow) { + code = tsdbCommitDataBlock(pCommitter, NULL); + if (code) goto _err; + } + + return code; + +_err: + tsdbError("vgId:%d, tsdb commit merge block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -1533,12 +1648,8 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { SRowInfo *pRowInfo = NULL; TABLEID id = {0}; while (true) { - code = tsdbNextCommitRow(pCommitter, &pRowInfo); - if (code) goto _err; - + pRowInfo = tsdbGetCommitRow(pCommitter); if (pRowInfo == NULL) { - /* end current table data commit (todo) */ - /* end remain table data commit*/ code = tsdbMoveCommitData(pCommitter, (TABLEID){.suid = INT64_MAX, .uid = INT64_MAX}); if (code) goto _err; @@ -1551,23 +1662,21 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { break; } - if (id.suid != pRowInfo->suid || id.uid != pRowInfo->uid) { - /* end current table data commit (todo) */ + ASSERT(pRowInfo->suid != id.suid || pRowInfo->uid != id.uid); - /* start new table data commit */ - id.suid = pRowInfo->suid; - id.uid = pRowInfo->uid; - // reader - code = tsdbMoveCommitData(pCommitter, id); - if (code) goto _err; - // writer - tMapDataReset(&pCommitter->dWriter.mBlock); - // other - code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); - if (code) goto _err; - code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema); - if (code) goto _err; - } + /* start new table data commit */ + id.suid = pRowInfo->suid; + id.uid = pRowInfo->uid; + // reader + code = tsdbMoveCommitData(pCommitter, id); + if (code) goto _err; + // writer + tMapDataReset(&pCommitter->dWriter.mBlock); + // other + code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); + if (code) goto _err; + code = tBlockDataInit(&pCommitter->dWriter.bData, id.suid, id.uid, pCommitter->skmRow.pTSchema); + if (code) goto _err; /* merge with data in .data file */ code = tsdbMergeTableData(pCommitter, id);