From f318fdfd6aeaf3bf06f5ed90561548cd6af4e73b Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sat, 8 Jan 2022 06:55:38 +0000 Subject: [PATCH] more --- source/dnode/vnode/tsdb/src/tsdbCommit.c | 356 +++++++++++------------ 1 file changed, 178 insertions(+), 178 deletions(-) diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index 81128ea9f3..b189312c37 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -41,6 +41,8 @@ typedef struct { SDataCols * pDataCols; } SCommitH; +#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) + #define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) #define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) #define TSDB_COMMIT_WRITE_FSET(ch) (&((ch)->wSet)) @@ -83,16 +85,16 @@ static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); static int tsdbComparKeyBlock(const void *arg1, const void *arg2); // static int tsdbWriteBlockInfo(SCommitH *pCommih); // static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); static int tsdbMoveBlock(SCommitH *pCommith, int bidx); static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock); static void tsdbResetCommitTable(SCommitH *pCommith); static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); -// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, -// TSKEY maxKey, int maxRows, int8_t update); +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update); int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; @@ -1063,19 +1065,18 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { pBlock = NULL; } } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // // merge pBlock data and memory data - // if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { - // TSDB_RUNLOCK_TABLE(pIter->pTable); - // return -1; - // } + // merge pBlock data and memory data + if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { + return -1; + } - // bidx++; - // if (bidx < nBlocks) { - // pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - // } else { - // pBlock = NULL; - // } - // nextKey = tsdbNextIterKey(pIter->pIter); + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + nextKey = tsdbNextIterKey(pIter->pIter); } else { // // Only commit memory data // if (pBlock == NULL) { @@ -1329,77 +1330,76 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo // return 0; // } -// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { -// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg * pCfg = REPO_CFG(pRepo); -// int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; -// SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; -// TSKEY keyLimit; -// int16_t colId = 0; -// SMergeInfo mInfo; -// SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; -// SBlock block, supBlock; -// SDFile * pDFile; +static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + TSKEY keyLimit; + int16_t colId = 0; + SMergeInfo mInfo; + SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; + SBlock block, supBlock; + SDFile * pDFile; -// if (bidx == nBlocks - 1) { -// keyLimit = pCommith->maxKey; -// } else { -// keyLimit = pBlock[1].keyFirst - 1; -// } + if (bidx == nBlocks - 1) { + keyLimit = pCommith->maxKey; + } else { + keyLimit = pBlock[1].keyFirst - 1; + } -// SSkipListIterator titer = *(pIter->pIter); -// if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; + SSkipListIterator titer = *(pIter->pIter); + if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; -// tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, -// pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); + tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, + pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); -// if (mInfo.nOperations == 0) { -// // no new data to insert (all updates denied) -// if (tsdbMoveBlock(pCommith, bidx) < 0) { -// return -1; -// } -// *(pIter->pIter) = titer; -// } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { -// // Ignore the block -// ASSERT(0); -// *(pIter->pIter) = titer; -// } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { -// // Add a sub-block -// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, -// pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, -// pCfg->update, &mInfo); -// if (pBlock->last) { -// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); -// } else { -// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); -// } + if (mInfo.nOperations == 0) { + // no new data to insert (all updates denied) + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + *(pIter->pIter) = titer; + } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { + // Ignore the block + ASSERT(0); + *(pIter->pIter) = titer; + } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { + // Add a sub-block + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, + pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, + &mInfo); + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + } -// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; -// if (pBlock->numOfSubBlocks == 1) { -// subBlocks[0] = *pBlock; -// subBlocks[0].numOfSubBlocks = 0; -// } else { -// memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), -// sizeof(SBlock) * pBlock->numOfSubBlocks); -// } -// subBlocks[pBlock->numOfSubBlocks] = block; -// supBlock = *pBlock; -// supBlock.keyFirst = mInfo.keyFirst; -// supBlock.keyLast = mInfo.keyLast; -// supBlock.numOfSubBlocks++; -// supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; -// supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); + if (pBlock->numOfSubBlocks == 1) { + subBlocks[0] = *pBlock; + subBlocks[0].numOfSubBlocks = 0; + } else { + memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + sizeof(SBlock) * pBlock->numOfSubBlocks); + } + subBlocks[pBlock->numOfSubBlocks] = block; + supBlock = *pBlock; + supBlock.keyFirst = mInfo.keyFirst; + supBlock.keyLast = mInfo.keyLast; + supBlock.numOfSubBlocks++; + supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; + supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); -// if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; -// } else { -// if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; -// if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -// -1; -// } + if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; + } -// return 0; -// } + return 0; +} static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; @@ -1454,107 +1454,107 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const return 0; } -// static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool -// isLastOneBlock) { -// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg * pCfg = REPO_CFG(pRepo); -// SBlock block; -// SDFile * pDFile; -// bool isLast; -// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + SBlock block; + SDFile * pDFile; + bool isLast; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); -// int biter = 0; -// while (true) { -// tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, -// pCfg->update); + int biter = 0; + while (true) { + tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, + pCfg->update); -// if (pCommith->pDataCols->numOfRows == 0) break; + if (pCommith->pDataCols->numOfRows == 0) break; -// if (isLastOneBlock) { -// if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { -// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); -// isLast = true; -// } else { -// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); -// isLast = false; -// } -// } else { -// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); -// isLast = false; -// } + if (isLastOneBlock) { + if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } -// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; -// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; -// } + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } -// return 0; -// } + return 0; +} -// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, -// TSKEY maxKey, int maxRows, int8_t update) { -// TSKEY key1 = INT64_MAX; -// TSKEY key2 = INT64_MAX; -// STSchema *pSchema = NULL; +static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, + TSKEY maxKey, int maxRows, int8_t update) { + TSKEY key1 = INT64_MAX; + TSKEY key2 = INT64_MAX; + STSchema *pSchema = NULL; -// ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); -// tdResetDataCols(pTarget); + ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); + tdResetDataCols(pTarget); -// while (true) { -// key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); -// SMemRow row = tsdbNextIterRow(pCommitIter->pIter); -// if (row == NULL || memRowKey(row) > maxKey) { -// key2 = INT64_MAX; -// } else { -// key2 = memRowKey(row); -// } + while (true) { + key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); + SMemRow row = tsdbNextIterRow(pCommitIter->pIter); + if (row == NULL || memRowKey(row) > maxKey) { + key2 = INT64_MAX; + } else { + key2 = memRowKey(row); + } -// if (key1 == INT64_MAX && key2 == INT64_MAX) break; + if (key1 == INT64_MAX && key2 == INT64_MAX) break; -// if (key1 < key2) { -// for (int i = 0; i < pDataCols->numOfCols; i++) { -// //TODO: dataColAppendVal may fail -// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, -// pTarget->maxPoints); -// } + if (key1 < key2) { + for (int i = 0; i < pDataCols->numOfCols; i++) { + // TODO: dataColAppendVal may fail + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } -// pTarget->numOfRows++; -// (*iter)++; -// } else if (key1 > key2) { -// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { -// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); -// ASSERT(pSchema != NULL); -// } + pTarget->numOfRows++; + (*iter)++; + } else if (key1 > key2) { + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); + } -// tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + tdAppendMemRowToDataCol(row, pSchema, pTarget, true); -// tSkipListIterNext(pCommitIter->pIter); -// } else { -// if (update != TD_ROW_OVERWRITE_UPDATE) { -// //copy disk data -// for (int i = 0; i < pDataCols->numOfCols; i++) { -// //TODO: dataColAppendVal may fail -// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, -// pTarget->maxPoints); -// } + tSkipListIterNext(pCommitIter->pIter); + } else { + if (update != TD_ROW_OVERWRITE_UPDATE) { + // copy disk data + for (int i = 0; i < pDataCols->numOfCols; i++) { + // TODO: dataColAppendVal may fail + dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, + pTarget->maxPoints); + } -// if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; -// } -// if (update != TD_ROW_DISCARD_UPDATE) { -// //copy mem data -// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { -// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); -// ASSERT(pSchema != NULL); -// } + if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; + } + if (update != TD_ROW_DISCARD_UPDATE) { + // copy mem data + if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { + pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); + ASSERT(pSchema != NULL); + } -// tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); -// } -// (*iter)++; -// tSkipListIterNext(pCommitIter->pIter); -// } + tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); + } + (*iter)++; + tSkipListIterNext(pCommitIter->pIter); + } -// if (pTarget->numOfRows >= maxRows) break; -// } -// } + if (pTarget->numOfRows >= maxRows) break; + } +} static void tsdbResetCommitTable(SCommitH *pCommith) { taosArrayClear(pCommith->aSubBlk); @@ -1573,23 +1573,23 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { -// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg * pCfg = REPO_CFG(pRepo); -// int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; +static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); + int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; -// ASSERT(mergeRows > 0); + ASSERT(mergeRows > 0); -// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { -// if (pBlock->last) { -// if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; -// } else { -// if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; -// } -// } + if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { + if (pBlock->last) { + if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; + } else { + if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; + } + } -// return false; -// } + return false; +} // int tsdbApplyRtn(STsdbRepo *pRepo) { // SRtn rtn;