diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 489d2a4a10..e82bb580ec 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -26,6 +26,7 @@ #include "tsdb.h" #include "tskiplist.h" #include "tutil.h" +#include "tchecksum.h" #include "tfs.h" #ifdef __cplusplus @@ -355,9 +356,11 @@ int tsdbOpenDFile(SDFile* pDFile, int flags); void tsdbCloseDFile(SDFile* pDFile); int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence); int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte); +int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset); int64_t tsdbTellDFile(SDFile* pDFile); int tsdbEncodeDFile(void** buf, SDFile* pDFile); void* tsdbDecodeDFile(void* buf, SDFile* pDFile); +void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm); typedef struct { int fid; diff --git a/src/tsdb/inc/tsdbReadImpl.h b/src/tsdb/inc/tsdbReadImpl.h index 5b099e27e6..df113f8b93 100644 --- a/src/tsdb/inc/tsdbReadImpl.h +++ b/src/tsdb/inc/tsdbReadImpl.h @@ -93,11 +93,12 @@ struct SReadH { #define TSDB_READ_REPO(rh) ((rh)->pRepo) #define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh)) #define TSDB_READ_FSET(rh) &((rh)->rSet) +#define TSDB_READ_TABLE(ch) ((rh)->pTable) #define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD) #define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA) #define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST) -#define TSDB_READ_BUF(rh) (rh)->pBuf -#define TSDB_READ_COMP_BUF(rh) (rh)->pCBuf +#define TSDB_READ_BUF(rh) ((rh)->pBuf) +#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf) #define TSDB_BLOCK_STATIS_SIZE(ncols) (sizeof(SBlockData) + sizeof(SBlockCol) * (ncols) + sizeof(TSCKSUM)) diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index f6b9bd5a99..114cc2e3fb 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -38,10 +38,14 @@ typedef struct { } SCommitH; #define TSDB_COMMIT_REPO(ch) TSDB_READ_REPO(&(ch->readh)) +#define TSDB_COMMIT_REPO_ID(ch) REPO_ID(TSDB_READ_REPO(&(ch->readh))) #define TSDB_COMMIT_WRITE_FSET(ch) ((ch)->pWSet) +#define TSDB_COMMIT_TABLE(ch) TSDB_READ_TABLE(&(ch->readh)) #define TSDB_COMMIT_HEAD_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_HEAD) #define TSDB_COMMIT_DATA_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_DATA) #define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST) +#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&(ch->readh)) +#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&(ch->readh)) void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { @@ -659,7 +663,114 @@ _err: static int tsdbWriteBlockInfo(SCommitH *pCommih) { SDFile *pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - // TODO + SBlockIdx blkIdx; + STable * pTable = TSDB_COMMIT_TABLE(pCommih); + SBlock * pBlock; + size_t nSupBlocks; + size_t nSubBlocks; + uint32_t tlen; + SBlockInfo *pBlkInfo; + int64_t offset; + + nSupBlocks = taosArrayGetSize(pCommih->aSupBlk); + nSubBlocks = taosArrayGetSize(pCommih->aSubBlk); + tlen = sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM); + + ASSERT(nSupBlocks > 0); + + // Write SBlockInfo part + if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1; + pBlkInfo = TSDB_COMMIT_BUF(pCommih); + + pBlkInfo->delimiter = TSDB_FILE_DELIMITER; + pBlkInfo->tid = TABLE_TID(pTable); + pBlkInfo->uid = TABLE_UID(pTable); + + memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock)); + if (nSubBlocks > 0) { + memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock)); + + for (int i = 0; i < nSupBlocks; i++) { + pBlock = pBlkInfo->blocks + i; + + if (pBlock->numOfSubBlocks > 1) { + pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); + } + } + } + + taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + + offset = tsdbSeekDFile(pHeadf, 0, SEEK_END); + if (offset < 0) { + tsdbError("vgId:%d failed to write block info part to file %s while seek since %s", TSDB_COMMIT_REPO_ID(pCommih), + TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); + return -1; + } + + if (tsdbWriteDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen) < tlen) { + tsdbError("vgId:%d failed to write block info part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih), + TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + + // Set blkIdx + pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1); + + blkIdx.tid = TABLE_TID(pTable); + blkIdx.uid = TABLE_UID(pTable); + blkIdx.hasLast = pBlock->last ? 1 : 0; + blkIdx.maxKey = pBlock->keyLast; + blkIdx.numOfBlocks = nSupBlocks; + blkIdx.len = tlen; + blkIdx.offset = (uint32_t)offset; + + ASSERT(blkIdx.numOfBlocks > 0); + + if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static int tsdbWriteBlockIdx(SCommitH *pCommih) { + SBlockIdx *pBlkIdx; + SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); + size_t nidx = taosArrayGetSize(pCommih->aBlkIdx); + int tlen = 0, size; + int64_t offset; + + ASSERT(nidx > 0); + + for (size_t i = 0; i < nidx; i++) { + pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i); + + size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); + if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1; + + void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen); + tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + + tlen += size; + } + + tlen += sizeof(TSCKSUM); + if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1; + taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen); + + if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) { + tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih), + TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno)); + return -1; + } + + tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM))); + pHeadf->info.offset = offset; + pHeadf->info.len = tlen; return 0; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index c5330852f5..2823a2517e 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -173,6 +173,19 @@ int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) { return -1; } + return nwrite; +} + +int64_t tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) { + ASSERT(TSDB_FILE_OPENED(pDFile)); + int64_t nwrite; + + *offset = tsdbSeekDFile(pDFile, 0, SEEK_SET); + if (*offset < 0) return -1; + + nwrite = tsdbWriteDFile(pDFile, buf, nbyte); + if (nwrite < 0) return nwrite; + pDFile->info.size += nbyte; return nwrite; } @@ -207,6 +220,10 @@ void *tsdbDecodeDFile(void *buf, SDFile *pDFile) { return buf; } +void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) { + pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); +} + static int tsdbEncodeDFInfo(void **buf, SDFInfo *pInfo) { int tlen = 0;