From 0359254bdfba6cd8aab7c1c4cc10d11d6b177997 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 20 Jun 2022 08:06:34 +0000 Subject: [PATCH] more work --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbCommit.c | 37 +++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 149 +++++++++++++++--- source/dnode/vnode/src/tsdb/tsdbUtil.c | 30 +++- 4 files changed, 176 insertions(+), 41 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7953c7e272..34ded9f263 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -101,6 +101,7 @@ int32_t tPutBlock(uint8_t *p, void *ph); int32_t tGetBlock(uint8_t *p, void *ph); int32_t tBlockCmprFn(const void *p1, const void *p2); // SBlockIdx +#define tBlockIdxInit(SUID, UID) ((SBlockIdx){.suid = (SUID), .uid = (UID), .info = tKEYINFOInit()}) int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); // SColdata diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index b04fdaa251..38ee34b5b4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -463,7 +463,8 @@ _err: return code; } -static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock) { +static int32_t tsdbMergeCommit(SCommitter *pCommitter, SBlockIdx *pBlockIdx, STbDataIter *pIter, SBlock *pBlock, + int8_t isLastBlock) { int32_t code = 0; TSDBROW *pRow; SBlock block = tBlockInit(); @@ -523,12 +524,18 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl STbDataIter iter; STbDataIter *pIter = &iter; TSDBROW *pRow; - SBlockIdx blockIdx; // TODO + int64_t suid; + int64_t uid; + SBlockIdx blockIdx; // create iter if (pTbData) { + suid = pTbData->suid; + uid = pTbData->uid; tsdbTbDataIterOpen(pTbData, &(TSDBKEY){.ts = pCommitter->minKey, .version = 0}, 0, pIter); } else { + suid = pBlockIdx->suid; + uid = pBlockIdx->uid; pIter = NULL; } @@ -538,23 +545,27 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // start ================================ tMapDataReset(&pCommitter->oBlockMap); - tMapDataReset(&pCommitter->nBlockMap); + tBlockReset(&pCommitter->oBlock); + tBlockDataReset(&pCommitter->oBlockData); if (pBlockIdx) { code = tsdbReadBlock(pCommitter->pReader, pBlockIdx, &pCommitter->oBlockMap, NULL); if (code) goto _err; } + blockIdx = tBlockIdxInit(suid, uid); + tMapDataReset(&pCommitter->nBlockMap); + tBlockReset(&pCommitter->nBlock); + tBlockDataReset(&pCommitter->nBlockData); + // impl =============================== - SBlock block; - SBlock *pBlock = █ int32_t iBlock = 0; - int32_t nBlockMap = pCommitter->oBlockMap.nItem; + int32_t nBlock = pCommitter->oBlockMap.nItem; // merge pRow = tsdbTbDataIterGet(pIter); - while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlockMap) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); - code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, pBlock); + while (!ROW_END(pRow, pCommitter->maxKey) && iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); + code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, &pCommitter->oBlock, iBlock == (nBlock - 1)); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); @@ -564,17 +575,17 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl // mem pRow = tsdbTbDataIterGet(pIter); while (!ROW_END(pRow, pCommitter->maxKey)) { - code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL); + code = tsdbMergeCommit(pCommitter, &blockIdx, pIter, NULL, 0); if (code) goto _err; pRow = tsdbTbDataIterGet(pIter); } // disk - while (iBlock < nBlockMap) { - tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, pBlock, tGetBlock); + while (iBlock < nBlock) { + tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock, &pCommitter->oBlock, tGetBlock); - code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, pBlock); + code = tsdbMergeCommit(pCommitter, &blockIdx, NULL, &pCommitter->oBlock, 0); if (code) goto _err; iBlock++; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 49dcab45c4..75b0edd684 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -489,17 +489,17 @@ _err: return code; } -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlockIdx, uint8_t **ppBuf) { +int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock, uint8_t **ppBuf) { int32_t code = 0; int64_t offset = pBlockIdx->offset; int64_t size = pBlockIdx->size; int64_t n; uint32_t delimiter; - tb_uid_t suid; - tb_uid_t uid; + int64_t suid; + int64_t uid; // alloc - if (!ppBuf) ppBuf = &mBlockIdx->pBuf; + if (!ppBuf) ppBuf = &mBlock->pBuf; code = tsdbRealloc(ppBuf, size); if (code) goto _err; @@ -533,7 +533,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl ASSERT(suid == pBlockIdx->suid); n += tGetI64(*ppBuf + n, &uid); ASSERT(uid == pBlockIdx->uid); - n += tGetMapData(*ppBuf + n, mBlockIdx); + n += tGetMapData(*ppBuf + n, mBlock); ASSERT(n + sizeof(TSCKSUM) == size); return code; @@ -625,20 +625,112 @@ _err: } int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter, uint8_t **ppBuf) { - int32_t code = 0; - // TODO + int32_t code = 0; + int64_t size = TSDB_FHDR_SIZE; + int64_t n; + uint8_t *pBuf = NULL; + SHeadFile *pHeadFile = pWriter->pSet->pHeadFile; + SDataFile *pDataFile = pWriter->pSet->pDataFile; + SLastFile *pLastFile = pWriter->pSet->pLastFile; + SSmaFile *pSmaFile = pWriter->pSet->pSmaFile; + + // alloc + if (!ppBuf) ppBuf = &pBuf; + code = tsdbRealloc(ppBuf, size); + if (code) goto _err; + + // head ============== + // build + memset(*ppBuf, 0, size); + // tPutHeadFileHdr(*ppBuf, pHeadFile); + taosCalcChecksumAppend(0, *ppBuf, size); + + // seek + if (taosLSeekFile(pWriter->pHeadFD, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // write + n = taosWriteFile(pWriter->pHeadFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // data ============== + memset(*ppBuf, 0, size); + // tPutDataFileHdr(*ppBuf, pDataFile); + taosCalcChecksumAppend(0, *ppBuf, size); + + // seek + if (taosLSeekFile(pWriter->pDataFD, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // write + n = taosWriteFile(pWriter->pDataFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // last ============== + memset(*ppBuf, 0, size); + // tPutLastFileHdr(*ppBuf, pLastFile); + taosCalcChecksumAppend(0, *ppBuf, size); + + // seek + if (taosLSeekFile(pWriter->pLastFD, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // write + n = taosWriteFile(pWriter->pLastFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // sma ============== + memset(*ppBuf, 0, size); + // tPutSmaFileHdr(*ppBuf, pSmaFile); + taosCalcChecksumAppend(0, *ppBuf, size); + + // seek + if (taosLSeekFile(pWriter->pSmaFD, 0, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + // write + n = taosWriteFile(pWriter->pSmaFD, *ppBuf, size); + if (n < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto _err; + } + + tsdbFree(pBuf); + return code; + +_err: + tsdbFree(pBuf); + tsdbError("vgId:%d update DFileSet header failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t **ppBuf) { - int32_t code = 0; - int64_t size = 0; - int64_t n = 0; - uint8_t *pBuf = NULL; +int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *mBlockIdx, uint8_t **ppBuf) { + int32_t code = 0; + int64_t size = 0; + SHeadFile *pHeadFile = pWriter->pSet->pHeadFile; + int64_t n = 0; + uint8_t *pBuf = NULL; // prepare size += tPutU32(NULL, TSDB_FILE_DLMT); - size = size + tPutMapData(NULL, pBlockIdxMap) + sizeof(TSCKSUM); + size = size + tPutMapData(NULL, mBlockIdx) + sizeof(TSCKSUM); // alloc if (!ppBuf) ppBuf = &pBuf; @@ -647,7 +739,7 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t // build n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); - n += tPutMapData(*ppBuf, pBlockIdxMap); + n += tPutMapData(*ppBuf, mBlockIdx); taosCalcChecksumAppend(0, *ppBuf, size); ASSERT(n + sizeof(TSCKSUM) == size); @@ -659,7 +751,9 @@ int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SMapData *pBlockIdxMap, uint8_t goto _err; } - // update (todo) + // update + pHeadFile->offset = pHeadFile->size; + pHeadFile->size += size; tsdbFree(pBuf); return code; @@ -670,20 +764,21 @@ _err: return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { - int32_t code = 0; - uint8_t *pBuf = NULL; - int64_t size; - int64_t n; +int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, uint8_t **ppBuf, SBlockIdx *pBlockIdx) { + int32_t code = 0; + SHeadFile *pHeadFile = pWriter->pSet->pHeadFile; + uint8_t *pBuf = NULL; + int64_t size; + int64_t n; - ASSERT(pBlockMap->nItem > 0); + ASSERT(mBlock->nItem > 0); // prepare size = 0; size += tPutU32(NULL, TSDB_FILE_DLMT); size += tPutI64(NULL, pBlockIdx->suid); size += tPutI64(NULL, pBlockIdx->uid); - size = size + tPutMapData(NULL, pBlockMap) + sizeof(TSCKSUM); + size = size + tPutMapData(NULL, mBlock) + sizeof(TSCKSUM); // alloc if (!ppBuf) ppBuf = &pBuf; @@ -695,7 +790,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppB n += tPutU32(*ppBuf + n, TSDB_FILE_DLMT); n += tPutI64(*ppBuf + n, pBlockIdx->suid); n += tPutI64(*ppBuf + n, pBlockIdx->uid); - n += tPutMapData(*ppBuf + n, pBlockMap); + n += tPutMapData(*ppBuf + n, mBlock); taosCalcChecksumAppend(0, *ppBuf, size); ASSERT(n + sizeof(TSCKSUM) == size); @@ -707,16 +802,18 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pBlockMap, uint8_t **ppB goto _err; } - // update (todo) - // pBlockIdx->offset = -1; + // update + pBlockIdx->offset = pHeadFile->size; pBlockIdx->size = size; - // pWriter->pSet->pHeadF.offset + pHeadFile->size += size; + tsdbFree(pBuf); tsdbTrace("vgId:%d write block, offset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), pBlockIdx->offset, pBlockIdx->size); return code; _err: + tsdbFree(pBuf); tsdbError("vgId:%d write block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 4529a709e1..af5732e2c8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -298,14 +298,40 @@ void tBlockClear(SBlock *pBlock) { int32_t tPutBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - // TODO + + n += tPutKEYINFO(p ? p + n : p, &pBlock->info); + n += tPutI32v(p ? p + n : p, pBlock->nRow); + n += tPutI8(p ? p + n : p, pBlock->last); + n += tPutI8(p ? p + n : p, pBlock->hasDup); + n += tPutI8(p ? p + n : p, pBlock->cmprAlg); + n += tPutI8(p ? p + n : p, pBlock->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].offset); + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].ksize); + n += tPutI64v(p ? p + n : p, pBlock->aSubBlock[iSubBlock].bsize); + n += tPutMapData(p ? p + n : p, &pBlock->aSubBlock[iSubBlock].mBlockCol); + } + return n; } int32_t tGetBlock(uint8_t *p, void *ph) { int32_t n = 0; SBlock *pBlock = (SBlock *)ph; - // TODO + + n += tGetKEYINFO(p + n, &pBlock->info); + n += tGetI32v(p + n, &pBlock->nRow); + n += tGetI8(p + n, &pBlock->last); + n += tGetI8(p + n, &pBlock->hasDup); + n += tGetI8(p + n, &pBlock->cmprAlg); + n += tGetI8(p + n, &pBlock->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pBlock->nSubBlock; iSubBlock++) { + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].offset); + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].ksize); + n += tGetI64v(p + n, &pBlock->aSubBlock[iSubBlock].bsize); + n += tGetMapData(p + n, &pBlock->aSubBlock[iSubBlock].mBlockCol); + } + return n; }