diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index b189312c37..dd7f349dac 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -70,9 +70,6 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fi // static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); // static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); // static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); -// static void tsdbStartCommit(STsdbRepo *pRepo); -// static void tsdbEndCommit(STsdbRepo *pRepo, int eno); -// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); // static int tsdbCreateCommitIters(SCommitH *pCommith); // static void tsdbDestroyCommitIters(SCommitH *pCommith); // static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); @@ -80,11 +77,11 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fi // static void tsdbDestroyCommitH(SCommitH *pCommith); // static int tsdbGetFidLevel(int fid, SRtn *pRtn); // static int tsdbNextCommitFid(SCommitH *pCommith); -static int tsdbCommitToTable(SCommitH *pCommith, int tid); -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -// static int tsdbWriteBlockInfo(SCommitH *pCommih); -// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +static int tsdbWriteBlockInfo(SCommitH *pCommih); +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); static int tsdbMoveBlock(SCommitH *pCommith, int bidx); static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); @@ -95,6 +92,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; @@ -372,7 +370,6 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return -1; } } -#if 0 if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < 0) { @@ -398,7 +395,6 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { return -1; } -#endif return 0; } @@ -582,107 +578,107 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid // extern int32_t tsTsdbMetaCompactRatio; -// int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, -// SBlockIdx *pIdx) { -// size_t nSupBlocks; -// size_t nSubBlocks; -// uint32_t tlen; -// SBlockInfo *pBlkInfo; -// int64_t offset; -// SBlock * pBlock; +int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, + SBlockIdx *pIdx) { + size_t nSupBlocks; + size_t nSubBlocks; + uint32_t tlen; + SBlockInfo *pBlkInfo; + int64_t offset; + SBlock * pBlock; -// memset(pIdx, 0, sizeof(*pIdx)); + memset(pIdx, 0, sizeof(*pIdx)); -// nSupBlocks = taosArrayGetSize(pSupA); -// nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); + nSupBlocks = taosArrayGetSize(pSupA); + nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); -// if (nSupBlocks <= 0) { -// // No data (data all deleted) -// return 0; -// } + if (nSupBlocks <= 0) { + // No data (data all deleted) + return 0; + } -// tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); -// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; -// pBlkInfo = *ppBuf; + tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + pBlkInfo = *ppBuf; -// pBlkInfo->delimiter = TSDB_FILE_DELIMITER; -// pBlkInfo->tid = TABLE_TID(pTable); -// pBlkInfo->uid = TABLE_UID(pTable); + pBlkInfo->delimiter = TSDB_FILE_DELIMITER; + pBlkInfo->tid = TABLE_TID(pTable); + pBlkInfo->uid = TABLE_UID(pTable); -// memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); -// if (nSubBlocks > 0) { -// memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); + memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); + if (nSubBlocks > 0) { + memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); -// for (int i = 0; i < nSupBlocks; i++) { -// pBlock = pBlkInfo->blocks + i; + for (int i = 0; i < nSupBlocks; i++) { + pBlock = pBlkInfo->blocks + i; -// if (pBlock->numOfSubBlocks > 1) { -// pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); -// } -// } -// } + if (pBlock->numOfSubBlocks > 1) { + pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + } + } + } -// taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); -// if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { -// return -1; -// } + if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { + return -1; + } -// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); -// // Set pIdx -// pBlock = taosArrayGetLast(pSupA); + // Set pIdx + pBlock = taosArrayGetLast(pSupA); -// pIdx->tid = TABLE_TID(pTable); -// pIdx->uid = TABLE_UID(pTable); -// pIdx->hasLast = pBlock->last ? 1 : 0; -// pIdx->maxKey = pBlock->keyLast; -// pIdx->numOfBlocks = (uint32_t)nSupBlocks; -// pIdx->len = tlen; -// pIdx->offset = (uint32_t)offset; + pIdx->tid = TABLE_TID(pTable); + pIdx->uid = TABLE_UID(pTable); + pIdx->hasLast = pBlock->last ? 1 : 0; + pIdx->maxKey = pBlock->keyLast; + pIdx->numOfBlocks = (uint32_t)nSupBlocks; + pIdx->len = tlen; + pIdx->offset = (uint32_t)offset; -// return 0; -// } + return 0; +} -// int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { -// SBlockIdx *pBlkIdx; -// size_t nidx = taosArrayGetSize(pIdxA); -// int tlen = 0, size; -// int64_t offset; +int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { + SBlockIdx *pBlkIdx; + size_t nidx = taosArrayGetSize(pIdxA); + int tlen = 0, size; + int64_t offset; -// if (nidx <= 0) { -// // All data are deleted -// pHeadf->info.offset = 0; -// pHeadf->info.len = 0; -// return 0; -// } + if (nidx <= 0) { + // All data are deleted + pHeadf->info.offset = 0; + pHeadf->info.len = 0; + return 0; + } -// for (size_t i = 0; i < nidx; i++) { -// pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); + for (size_t i = 0; i < nidx; i++) { + pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); -// size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); -// if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; + size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); + if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; -// void *ptr = POINTER_SHIFT(*ppBuf, tlen); -// tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + void *ptr = POINTER_SHIFT(*ppBuf, tlen); + tsdbEncodeSBlockIdx(&ptr, pBlkIdx); -// tlen += size; -// } + tlen += size; + } -// tlen += sizeof(TSCKSUM); -// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; -// taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); + tlen += sizeof(TSCKSUM); + if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); -// if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { -// return -1; -// } + if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { + return -1; + } -// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); -// pHeadf->info.offset = (uint32_t)offset; -// pHeadf->info.len = tlen; + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); + pHeadf->info.offset = (uint32_t)offset; + pHeadf->info.len = tlen; -// return 0; -// } + return 0; +} // // =================== Commit Meta Data // static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { @@ -1078,27 +1074,25 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) { } nextKey = tsdbNextIterKey(pIter->pIter); } else { - // // Only commit memory data - // if (pBlock == NULL) { - // if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { - // TSDB_RUNLOCK_TABLE(pIter->pTable); - // return -1; - // } - // } else { - // if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { - // TSDB_RUNLOCK_TABLE(pIter->pTable); - // return -1; - // } - // } - // nextKey = tsdbNextIterKey(pIter->pIter); + // Only commit memory data + if (pBlock == NULL) { + if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { + return -1; + } + } else { + if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { + return -1; + } + } + nextKey = tsdbNextIterKey(pIter->pIter); } } - // if (tsdbWriteBlockInfo(pCommith) < 0) { - // tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - // TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - // return -1; - // } + if (tsdbWriteBlockInfo(pCommith) < 0) { + tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + return -1; + } return 0; } @@ -1274,61 +1268,60 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); } -// static int tsdbWriteBlockInfo(SCommitH *pCommih) { -// SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); -// SBlockIdx blkIdx; -// STable * pTable = TSDB_COMMIT_TABLE(pCommih); +static int tsdbWriteBlockInfo(SCommitH *pCommih) { + SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); + SBlockIdx blkIdx; + STable * pTable = TSDB_COMMIT_TABLE(pCommih); -// if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void -// **)(&(TSDB_COMMIT_BUF(pCommih))), -// &blkIdx) < 0) { -// return -1; -// } + if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), + &blkIdx) < 0) { + return -1; + } -// if (blkIdx.numOfBlocks == 0) { -// return 0; -// } + if (blkIdx.numOfBlocks == 0) { + return 0; + } -// if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { -// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; -// return -1; -// } + if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } -// return 0; -// } + return 0; +} -// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { -// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg * pCfg = REPO_CFG(pRepo); -// SMergeInfo mInfo; -// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); -// SDFile * pDFile; -// bool isLast; -// SBlock block; +static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg * pCfg = REPO_CFG(pRepo); + SMergeInfo mInfo; + int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + SDFile * pDFile; + bool isLast; + SBlock block; -// while (true) { -// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, -// pCfg->update, &mInfo); + while (true) { + tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, + pCfg->update, &mInfo); -// if (pCommith->pDataCols->numOfRows <= 0) break; + if (pCommith->pDataCols->numOfRows <= 0) break; -// if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { -// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); -// isLast = false; -// } else { -// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); -// isLast = true; -// } + if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isLast = false; + } else { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isLast = true; + } -// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; -// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { -// return -1; -// } -// } + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { + return -1; + } + } -// return 0; -// } + return 0; +} static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { STsdb * pRepo = TSDB_COMMIT_REPO(pCommith);