partial work
This commit is contained in:
parent
98cc29197a
commit
a46e84dce7
|
@ -31,6 +31,8 @@ typedef struct {
|
||||||
#define TFS_UNDECIDED_ID -1
|
#define TFS_UNDECIDED_ID -1
|
||||||
#define TFS_PRIMARY_LEVEL 0
|
#define TFS_PRIMARY_LEVEL 0
|
||||||
#define TFS_PRIMARY_ID 0
|
#define TFS_PRIMARY_ID 0
|
||||||
|
#define TFS_MIN_LEVEL 0
|
||||||
|
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
|
||||||
|
|
||||||
// FS APIs ====================================
|
// FS APIs ====================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -29,10 +29,17 @@ typedef struct {
|
||||||
int64_t size;
|
int64_t size;
|
||||||
} SKVRecord;
|
} SKVRecord;
|
||||||
|
|
||||||
|
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
|
void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn);
|
||||||
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
||||||
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo);
|
void *tsdbCommitData(STsdbRepo *pRepo);
|
||||||
|
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
||||||
|
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
|
||||||
|
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
||||||
|
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
||||||
|
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
if (fid >= pRtn->maxFid) {
|
if (fid >= pRtn->maxFid) {
|
||||||
|
|
|
@ -45,7 +45,7 @@ typedef struct {
|
||||||
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
|
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
|
||||||
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
|
||||||
#define TSDB_COMMIT_DEFAULT_ROWS(ch) (TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5)
|
#define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock)
|
||||||
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
|
||||||
|
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
|
@ -66,7 +66,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid);
|
||||||
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
|
static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable);
|
||||||
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
|
static int tsdbComparKeyBlock(const void *arg1, const void *arg2);
|
||||||
static int tsdbWriteBlockInfo(SCommitH *pCommih);
|
static int tsdbWriteBlockInfo(SCommitH *pCommih);
|
||||||
static int tsdbWriteBlockIdx(SCommitH *pCommih);
|
|
||||||
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
|
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData);
|
||||||
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
|
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx);
|
||||||
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
|
static int tsdbMoveBlock(SCommitH *pCommith, int bidx);
|
||||||
|
@ -81,7 +80,6 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p
|
||||||
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||||
TSKEY maxKey, int maxRows, int8_t update);
|
TSKEY maxKey, int maxRows, int8_t update);
|
||||||
static int tsdbApplyRtn(STsdbRepo *pRepo);
|
static int tsdbApplyRtn(STsdbRepo *pRepo);
|
||||||
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
|
||||||
|
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||||
tsdbStartCommit(pRepo);
|
tsdbStartCommit(pRepo);
|
||||||
|
@ -109,6 +107,151 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
||||||
|
SDiskID did;
|
||||||
|
SDFileSet nSet;
|
||||||
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
int level;
|
||||||
|
|
||||||
|
ASSERT(pSet->fid >= pRtn->minFid);
|
||||||
|
|
||||||
|
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
||||||
|
|
||||||
|
tfsAllocDisk(level, &(did.level), &(did.id));
|
||||||
|
if (did.level == TFS_UNDECIDED_LEVEL) {
|
||||||
|
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
||||||
|
// Need to move the FSET to higher level
|
||||||
|
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
||||||
|
|
||||||
|
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
||||||
|
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
|
||||||
|
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
|
||||||
|
} else {
|
||||||
|
// On a correct level
|
||||||
|
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf,
|
||||||
|
SBlockIdx *pIdx) {
|
||||||
|
size_t nSupBlocks;
|
||||||
|
size_t nSubBlocks;
|
||||||
|
uint32_t tlen;
|
||||||
|
SBlockInfo *pBlkInfo;
|
||||||
|
int64_t offset;
|
||||||
|
SBlock * pBlock;
|
||||||
|
|
||||||
|
memset(pIdx, 0, sizeof(*pIdx));
|
||||||
|
|
||||||
|
nSupBlocks = taosArrayGetSize(pSupA);
|
||||||
|
nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA);
|
||||||
|
|
||||||
|
if (nSupBlocks <= 0) {
|
||||||
|
// No data (data all deleted)
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
|
||||||
|
pBlkInfo = *ppBuf;
|
||||||
|
|
||||||
|
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
|
||||||
|
pBlkInfo->tid = TABLE_TID(pTable);
|
||||||
|
pBlkInfo->uid = TABLE_UID(pTable);
|
||||||
|
|
||||||
|
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
|
||||||
|
if (nSubBlocks > 0) {
|
||||||
|
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock));
|
||||||
|
|
||||||
|
for (int i = 0; i < nSupBlocks; i++) {
|
||||||
|
pBlock = pBlkInfo->blocks + i;
|
||||||
|
|
||||||
|
if (pBlock->numOfSubBlocks > 1) {
|
||||||
|
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
|
||||||
|
|
||||||
|
if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
|
||||||
|
|
||||||
|
// Set pIdx
|
||||||
|
pBlock = taosArrayGetLast(pSupA);
|
||||||
|
|
||||||
|
pIdx->tid = TABLE_TID(pTable);
|
||||||
|
pIdx->uid = TABLE_UID(pTable);
|
||||||
|
pIdx->hasLast = pBlock->last ? 1 : 0;
|
||||||
|
pIdx->maxKey = pBlock->keyLast;
|
||||||
|
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
|
||||||
|
pIdx->len = tlen;
|
||||||
|
pIdx->offset = (uint32_t)offset;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
|
||||||
|
SBlockIdx *pBlkIdx;
|
||||||
|
size_t nidx = taosArrayGetSize(pIdxA);
|
||||||
|
int tlen = 0, size;
|
||||||
|
int64_t offset;
|
||||||
|
|
||||||
|
if (nidx <= 0) {
|
||||||
|
// All data are deleted
|
||||||
|
pHeadf->info.offset = 0;
|
||||||
|
pHeadf->info.len = 0;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (size_t i = 0; i < nidx; i++) {
|
||||||
|
pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i);
|
||||||
|
|
||||||
|
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1;
|
||||||
|
|
||||||
|
void *ptr = POINTER_SHIFT(*ppBuf, tlen);
|
||||||
|
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
|
||||||
|
|
||||||
|
tlen += size;
|
||||||
|
}
|
||||||
|
|
||||||
|
tlen += sizeof(TSCKSUM);
|
||||||
|
if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1;
|
||||||
|
taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen);
|
||||||
|
|
||||||
|
if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM)));
|
||||||
|
pHeadf->info.offset = (uint32_t)offset;
|
||||||
|
pHeadf->info.len = tlen;
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// =================== Commit Meta Data
|
// =================== Commit Meta Data
|
||||||
static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
STsdbFS * pfs = REPO_FS(pRepo);
|
STsdbFS * pfs = REPO_FS(pRepo);
|
||||||
|
@ -438,7 +581,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsdbWriteBlockIdx(pCommith) < 0) {
|
if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) <
|
||||||
|
0) {
|
||||||
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
tsdbCloseCommitFile(pCommith, true);
|
tsdbCloseCommitFile(pCommith, true);
|
||||||
// revert the file change
|
// revert the file change
|
||||||
|
@ -738,23 +882,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
|
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
||||||
bool isSuper) {
|
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) {
|
||||||
STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
|
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
SBlockData *pBlockData;
|
SBlockData *pBlockData;
|
||||||
int64_t offset = 0;
|
int64_t offset = 0;
|
||||||
STable * pTable = TSDB_COMMIT_TABLE(pCommith);
|
|
||||||
int rowsToWrite = pDataCols->numOfRows;
|
int rowsToWrite = pDataCols->numOfRows;
|
||||||
|
|
||||||
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock);
|
||||||
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
|
ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock);
|
||||||
|
|
||||||
// Make buffer space
|
// Make buffer space
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
|
if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
|
pBlockData = (SBlockData *)(*ppBuf);
|
||||||
|
|
||||||
// Get # of cols not all NULL(not including key column)
|
// Get # of cols not all NULL(not including key column)
|
||||||
int nColsNotAllNull = 0;
|
int nColsNotAllNull = 0;
|
||||||
|
@ -800,23 +942,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
||||||
void * tptr;
|
void * tptr;
|
||||||
|
|
||||||
// Make room
|
// Make room
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommith)), lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
|
if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pBlockData = (SBlockData *)TSDB_COMMIT_BUF(pCommith);
|
pBlockData = (SBlockData *)(*ppBuf);
|
||||||
pBlockCol = pBlockData->cols + tcol;
|
pBlockCol = pBlockData->cols + tcol;
|
||||||
tptr = POINTER_SHIFT(pBlockData, lsize);
|
tptr = POINTER_SHIFT(pBlockData, lsize);
|
||||||
|
|
||||||
if (pCfg->compression == TWO_STAGE_COMP &&
|
if (pCfg->compression == TWO_STAGE_COMP &&
|
||||||
tsdbMakeRoom((void **)(&TSDB_COMMIT_COMP_BUF(pCommith)), tlen + COMP_OVERFLOW_BYTES) < 0) {
|
tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compress or just copy
|
// Compress or just copy
|
||||||
if (pCfg->compression) {
|
if (pCfg->compression) {
|
||||||
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr,
|
||||||
tlen + COMP_OVERFLOW_BYTES, pCfg->compression,
|
tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf,
|
||||||
TSDB_COMMIT_COMP_BUF(pCommith), tlen + COMP_OVERFLOW_BYTES);
|
tlen + COMP_OVERFLOW_BYTES);
|
||||||
} else {
|
} else {
|
||||||
flen = tlen;
|
flen = tlen;
|
||||||
memcpy(tptr, pDataCol->pData, flen);
|
memcpy(tptr, pDataCol->pData, flen);
|
||||||
|
@ -872,68 +1014,27 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast,
|
||||||
|
bool isSuper) {
|
||||||
|
return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast,
|
||||||
|
isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))),
|
||||||
|
(void **)(&(TSDB_COMMIT_COMP_BUF(pCommith))));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
||||||
SBlockIdx blkIdx;
|
SBlockIdx blkIdx;
|
||||||
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
|
STable * pTable = TSDB_COMMIT_TABLE(pCommih);
|
||||||
SBlock * pBlock;
|
|
||||||
size_t nSupBlocks;
|
|
||||||
size_t nSubBlocks;
|
|
||||||
uint32_t tlen;
|
|
||||||
SBlockInfo *pBlkInfo;
|
|
||||||
int64_t offset;
|
|
||||||
|
|
||||||
nSupBlocks = taosArrayGetSize(pCommih->aSupBlk);
|
if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))),
|
||||||
nSubBlocks = taosArrayGetSize(pCommih->aSubBlk);
|
&blkIdx) < 0) {
|
||||||
|
|
||||||
if (nSupBlocks <= 0) {
|
|
||||||
// No data (data all deleted)
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM));
|
|
||||||
|
|
||||||
// Write SBlockInfo part
|
|
||||||
if (tsdbMakeRoom((void **)(&(TSDB_COMMIT_BUF(pCommih))), tlen) < 0) return -1;
|
|
||||||
pBlkInfo = TSDB_COMMIT_BUF(pCommih);
|
|
||||||
|
|
||||||
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
|
|
||||||
pBlkInfo->tid = TABLE_TID(pTable);
|
|
||||||
pBlkInfo->uid = TABLE_UID(pTable);
|
|
||||||
|
|
||||||
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pCommih->aSupBlk, 0), nSupBlocks * sizeof(SBlock));
|
|
||||||
if (nSubBlocks > 0) {
|
|
||||||
memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pCommih->aSubBlk, 0), nSubBlocks * sizeof(SBlock));
|
|
||||||
|
|
||||||
for (int i = 0; i < nSupBlocks; i++) {
|
|
||||||
pBlock = pBlkInfo->blocks + i;
|
|
||||||
|
|
||||||
if (pBlock->numOfSubBlocks > 1) {
|
|
||||||
pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen);
|
|
||||||
|
|
||||||
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM)));
|
if (blkIdx.numOfBlocks == 0) {
|
||||||
|
return 0;
|
||||||
// Set blkIdx
|
}
|
||||||
pBlock = taosArrayGet(pCommih->aSupBlk, nSupBlocks - 1);
|
|
||||||
|
|
||||||
blkIdx.tid = TABLE_TID(pTable);
|
|
||||||
blkIdx.uid = TABLE_UID(pTable);
|
|
||||||
blkIdx.hasLast = pBlock->last ? 1 : 0;
|
|
||||||
blkIdx.maxKey = pBlock->keyLast;
|
|
||||||
blkIdx.numOfBlocks = (uint32_t)nSupBlocks;
|
|
||||||
blkIdx.len = tlen;
|
|
||||||
blkIdx.offset = (uint32_t)offset;
|
|
||||||
|
|
||||||
ASSERT(blkIdx.numOfBlocks > 0);
|
|
||||||
|
|
||||||
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
|
if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
@ -943,49 +1044,6 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbWriteBlockIdx(SCommitH *pCommih) {
|
|
||||||
SBlockIdx *pBlkIdx;
|
|
||||||
SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih);
|
|
||||||
size_t nidx = taosArrayGetSize(pCommih->aBlkIdx);
|
|
||||||
int tlen = 0, size;
|
|
||||||
int64_t offset;
|
|
||||||
|
|
||||||
if (nidx <= 0) {
|
|
||||||
// All data are deleted
|
|
||||||
pHeadf->info.offset = 0;
|
|
||||||
pHeadf->info.len = 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 0; i < nidx; i++) {
|
|
||||||
pBlkIdx = (SBlockIdx *)taosArrayGet(pCommih->aBlkIdx, i);
|
|
||||||
|
|
||||||
size = tsdbEncodeSBlockIdx(NULL, pBlkIdx);
|
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen + size) < 0) return -1;
|
|
||||||
|
|
||||||
void *ptr = POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen);
|
|
||||||
tsdbEncodeSBlockIdx(&ptr, pBlkIdx);
|
|
||||||
|
|
||||||
tlen += size;
|
|
||||||
}
|
|
||||||
|
|
||||||
tlen += sizeof(TSCKSUM);
|
|
||||||
if (tsdbMakeRoom((void **)(&TSDB_COMMIT_BUF(pCommih)), tlen) < 0) return -1;
|
|
||||||
taosCalcChecksumAppend(0, (uint8_t *)TSDB_COMMIT_BUF(pCommih), tlen);
|
|
||||||
|
|
||||||
if (tsdbAppendDFile(pHeadf, TSDB_COMMIT_BUF(pCommih), tlen, &offset) < tlen) {
|
|
||||||
tsdbError("vgId:%d failed to write block index part to file %s since %s", TSDB_COMMIT_REPO_ID(pCommih),
|
|
||||||
TSDB_FILE_FULL_NAME(pHeadf), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(TSDB_COMMIT_BUF(pCommih), tlen - sizeof(TSCKSUM)));
|
|
||||||
pHeadf->info.offset = (uint32_t)offset;
|
|
||||||
pHeadf->info.len = tlen;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) {
|
||||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||||
|
@ -1438,45 +1496,3 @@ static int tsdbApplyRtn(STsdbRepo *pRepo) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) {
|
|
||||||
SDiskID did;
|
|
||||||
SDFileSet nSet;
|
|
||||||
STsdbFS * pfs = REPO_FS(pRepo);
|
|
||||||
int level;
|
|
||||||
|
|
||||||
ASSERT(pSet->fid >= pRtn->minFid);
|
|
||||||
|
|
||||||
level = tsdbGetFidLevel(pSet->fid, pRtn);
|
|
||||||
|
|
||||||
tfsAllocDisk(level, &(did.level), &(did.id));
|
|
||||||
if (did.level == TFS_UNDECIDED_LEVEL) {
|
|
||||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (did.level > TSDB_FSET_LEVEL(pSet)) {
|
|
||||||
// Need to move the FSET to higher level
|
|
||||||
tsdbInitDFileSet(&nSet, did, REPO_ID(pRepo), pSet->fid, FS_TXN_VERSION(pfs));
|
|
||||||
|
|
||||||
if (tsdbCopyDFileSet(pSet, &nSet) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to copy FSET %d from level %d to level %d since %s", REPO_ID(pRepo), pSet->fid,
|
|
||||||
TSDB_FSET_LEVEL(pSet), did.level, tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbUpdateDFileSet(pfs, &nSet) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d", REPO_ID(pRepo), pSet->fid,
|
|
||||||
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet), did.level, did.id);
|
|
||||||
} else {
|
|
||||||
// On a correct level
|
|
||||||
if (tsdbUpdateDFileSet(pfs, pSet) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
Loading…
Reference in New Issue