From a4ca770449dbdde9201d58eabf5fb10dc1932da1 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 20 Jun 2023 09:33:22 +0800 Subject: [PATCH] refact code --- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 198 ---------------------- source/dnode/vnode/src/tsdb/tsdbMerge.c | 191 --------------------- 2 files changed, 389 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index 7e5db55eba..5e4b3ad468 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -104,204 +104,6 @@ static int32_t tsdbCommitCloseWriter(SCommitter2 *committer) { return tsdbFSetWriterClose(&committer->writer, 0, committer->fopArray); } -#if 0 -static int32_t tsdbCommitTSDataToDataTableBegin(SCommitter2 *committer, const TABLEID *tbid) { - int32_t code = 0; - int32_t lino = 0; - - committer->ctx->tbid->suid = tbid->suid; - committer->ctx->tbid->uid = tbid->uid; - - code = tsdbUpdateSkmTb(committer->tsdb, committer->ctx->tbid, committer->skmTb); - TSDB_CHECK_CODE(code, lino, _exit); - - committer->blockDataIdx = 0; - for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) { - code = tBlockDataInit(&committer->blockData[i], committer->ctx->tbid, committer->skmTb->pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbCommitTSDataToDataTableEnd(SCommitter2 *committer) { - if (committer->ctx->tbid->uid == 0) return 0; - - int32_t code = 0; - int32_t lino = 0; - - int32_t cidx = committer->blockDataIdx; - int32_t pidx = ((cidx + 1) & 1); - int32_t numRow = (committer->blockData[cidx].nRow + committer->blockData[pidx].nRow) / 2; - - if (committer->blockData[pidx].nRow > 0 && numRow >= committer->minRow) { - ASSERT(committer->blockData[pidx].nRow == committer->maxRow); - - SRowInfo row[1] = {{ - .suid = committer->ctx->tbid->suid, - .uid = committer->ctx->tbid->uid, - .row = tsdbRowFromBlockData(committer->blockData + pidx, 0), - }}; - - for (int32_t i = 0; i < numRow; i++) { - row->row.iRow = i; - - code = tsdbDataFileWriteRow(committer->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDataFileFlush(committer->dataWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = numRow; i < committer->blockData[pidx].nRow; i++) { - row->row.iRow = i; - code = tsdbDataFileWriteRow(committer->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - - row->row = tsdbRowFromBlockData(committer->blockData + cidx, 0); - for (int32_t i = 0; i < committer->blockData[cidx].nRow; i++) { - row->row.iRow = i; - code = tsdbDataFileWriteRow(committer->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - if (committer->blockData[pidx].nRow > 0) { - code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } - if (committer->blockData[cidx].nRow < committer->minRow) { - code = tsdbSttFileWriteBlockData(committer->sttWriter, committer->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - for (int32_t i = 0; i < ARRAY_SIZE(committer->blockData); i++) { - tBlockDataReset(&committer->blockData[i]); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbCommitTSDataToData(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - SMetaInfo info; - for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { - if (row->uid != committer->ctx->tbid->uid) { - // end last table write - code = tsdbCommitTSDataToDataTableEnd(committer); - TSDB_CHECK_CODE(code, lino, _exit); - - // Ignore table of obsolescence - if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { - code = tsdbIterMergerSkipTableData(committer->dataIterMerger, (TABLEID *)row); - TSDB_CHECK_CODE(code, lino, _exit); - continue; - } - - code = tsdbCommitTSDataToDataTableBegin(committer, (TABLEID *)row); - TSDB_CHECK_CODE(code, lino, _exit); - } - - if (row->row.type == TSDBROW_ROW_FMT) { - code = tsdbUpdateSkmRow(committer->tsdb, committer->ctx->tbid, TSDBROW_SVERSION(&row->row), committer->skmRow); - TSDB_CHECK_CODE(code, lino, _exit); - } - - TSDBKEY key = TSDBROW_KEY(&row->row); - if (key.version <= committer->compactVersion // - && committer->blockData[committer->blockDataIdx].nRow > 0 // - && key.ts == committer->blockData[committer->blockDataIdx] - .aTSKEY[committer->blockData[committer->blockDataIdx].nRow - 1]) { - code = - tBlockDataUpdateRow(committer->blockData + committer->blockDataIdx, &row->row, committer->skmRow->pTSchema); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (committer->blockData[committer->blockDataIdx].nRow >= committer->maxRow) { - int32_t idx = ((committer->blockDataIdx + 1) & 1); - if (committer->blockData[idx].nRow >= committer->maxRow) { - code = tsdbDataFileWriteBlockData(committer->dataWriter, committer->blockData + idx); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(committer->blockData + idx); - } - committer->blockDataIdx = idx; - } - - code = tBlockDataAppendRow(&committer->blockData[committer->blockDataIdx], &row->row, committer->skmRow->pTSchema, - row->uid); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbIterMergerNext(committer->dataIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbCommitTSDataToDataTableEnd(committer); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbCommitTSDataToStt(SCommitter2 *committer) { - int32_t code = 0; - int32_t lino = 0; - - ASSERT(committer->sttReader == NULL); - - SMetaInfo info; - for (SRowInfo *row; (row = tsdbIterMergerGetData(committer->dataIterMerger)) != NULL;) { - if (row->uid != committer->ctx->tbid->uid) { - committer->ctx->tbid->suid = row->suid; - committer->ctx->tbid->uid = row->uid; - - // Ignore table of obsolescence - if (metaGetInfo(committer->tsdb->pVnode->pMeta, row->uid, &info, NULL) != 0) { - code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); - TSDB_CHECK_CODE(code, lino, _exit); - continue; - } - } - - TSKEY ts = TSDBROW_TS(&row->row); - if (ts > committer->ctx->maxKey) { - committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts); - code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbSttFileWriteRow(committer->sttWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(committer->dataIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(committer->tsdb->pVnode), lino, code); - } - return code; -} -#endif - static int32_t tsdbCommitTSData(SCommitter2 *committer) { int32_t code = 0; int32_t lino = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbMerge.c b/source/dnode/vnode/src/tsdb/tsdbMerge.c index 97229714e8..41efe345b5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMerge.c +++ b/source/dnode/vnode/src/tsdb/tsdbMerge.c @@ -97,197 +97,6 @@ _exit: return code; } -#if 0 -static int32_t tsdbMergeToDataTableEnd(SMerger *merger) { - if (merger->ctx->blockData[0].nRow + merger->ctx->blockData[1].nRow == 0) return 0; - - int32_t code = 0; - int32_t lino = 0; - int32_t cidx = merger->ctx->blockDataIdx; - int32_t pidx = (cidx + 1) % 2; - int32_t numRow = (merger->ctx->blockData[pidx].nRow + merger->ctx->blockData[cidx].nRow) / 2; - - if (merger->ctx->blockData[pidx].nRow > 0 && numRow >= merger->minRow) { - ASSERT(merger->ctx->blockData[pidx].nRow == merger->maxRow); - - SRowInfo row[1] = {{ - .suid = merger->ctx->tbid->suid, - .uid = merger->ctx->tbid->uid, - .row = tsdbRowFromBlockData(merger->ctx->blockData + pidx, 0), - }}; - - for (int32_t i = 0; i < numRow; i++) { - row->row.iRow = i; - - code = tsdbDataFileWriteRow(merger->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbDataFileFlush(merger->dataWriter); - TSDB_CHECK_CODE(code, lino, _exit); - - for (int32_t i = numRow; i < merger->ctx->blockData[pidx].nRow; i++) { - row->row.iRow = i; - code = tsdbDataFileWriteRow(merger->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - - row->row = tsdbRowFromBlockData(merger->ctx->blockData + cidx, 0); - for (int32_t i = 0; i < merger->ctx->blockData[cidx].nRow; i++) { - row->row.iRow = i; - code = tsdbDataFileWriteRow(merger->dataWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - if (merger->ctx->blockData[pidx].nRow > 0) { - code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } - if (merger->ctx->blockData[cidx].nRow < merger->minRow) { - code = tsdbSttFileWriteBlockData(merger->sttWriter, merger->ctx->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + cidx); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - - for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) { - tBlockDataReset(merger->ctx->blockData + i); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbMergeToDataTableBegin(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - - code = tsdbUpdateSkmTb(merger->tsdb, merger->ctx->tbid, merger->skmTb); - TSDB_CHECK_CODE(code, lino, _exit); - - merger->ctx->blockDataIdx = 0; - for (int32_t i = 0; i < ARRAY_SIZE(merger->ctx->blockData); i++) { - code = tBlockDataInit(merger->ctx->blockData + i, merger->ctx->tbid, merger->skmTb->pTSchema, NULL, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbMergeToDataLevel(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - - // data - for (SRowInfo *row; (row = tsdbIterMergerGetData(merger->dataIterMerger)) != NULL;) { - if (row->uid != merger->ctx->tbid->uid) { - code = tsdbMergeToDataTableEnd(merger); - TSDB_CHECK_CODE(code, lino, _exit); - - merger->ctx->tbid->suid = row->suid; - merger->ctx->tbid->uid = row->uid; - - code = tsdbMergeToDataTableBegin(merger); - TSDB_CHECK_CODE(code, lino, _exit); - } - - TSDBKEY key[1] = {TSDBROW_KEY(&row->row)}; - - if (key->version <= merger->compactVersion // - && merger->ctx->blockData[merger->ctx->blockDataIdx].nRow > 0 // - && merger->ctx->blockData[merger->ctx->blockDataIdx] - .aTSKEY[merger->ctx->blockData[merger->ctx->blockDataIdx].nRow - 1] == key->ts) { - // update - code = tBlockDataUpdateRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - if (merger->ctx->blockData[merger->ctx->blockDataIdx].nRow >= merger->maxRow) { - int32_t idx = (merger->ctx->blockDataIdx + 1) % 2; - - code = tsdbDataFileWriteBlockData(merger->dataWriter, merger->ctx->blockData + idx); - TSDB_CHECK_CODE(code, lino, _exit); - - tBlockDataClear(merger->ctx->blockData + idx); - - // switch to next bData - merger->ctx->blockDataIdx = idx; - } - - code = tBlockDataAppendRow(merger->ctx->blockData + merger->ctx->blockDataIdx, &row->row, NULL, row->uid); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbIterMergerNext(merger->dataIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbMergeToDataTableEnd(merger); - TSDB_CHECK_CODE(code, lino, _exit); - - // tomb - STombRecord *record; - while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) { - if (tsdbSttFileWriterIsOpened(merger->sttWriter)) { - code = tsdbSttFileWriteTombRecord(merger->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - } else { - code = tsdbDataFileWriteTombRecord(merger->dataWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - } - - code = tsdbIterMergerNext(merger->tombIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - TSDB_ERROR_LOG(TD_VID(merger->tsdb->pVnode), lino, code); - } - return code; -} - -static int32_t tsdbMergeToUpperLevel(SMerger *merger) { - int32_t code = 0; - int32_t lino = 0; - int32_t vid = TD_VID(merger->tsdb->pVnode); - - // data - SRowInfo *row; - while ((row = tsdbIterMergerGetData(merger->dataIterMerger))) { - code = tsdbSttFileWriteRow(merger->sttWriter, row); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(merger->dataIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - - // tomb - STombRecord *record; - while ((record = tsdbIterMergerGetTombRecord(merger->tombIterMerger))) { - code = tsdbSttFileWriteTombRecord(merger->sttWriter, record); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbIterMergerNext(merger->tombIterMerger); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - if (code) { - tsdbError("vid:%d %s failed at line %d since %s", vid, __func__, lino, tstrerror(code)); - } - return code; -} -#endif - static int32_t tsdbMergeFileSetBeginOpenReader(SMerger *merger) { int32_t code = 0; int32_t lino = 0;