more work
This commit is contained in:
parent
1758110087
commit
f689d8cc72
|
@ -241,7 +241,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "No table d
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "File already exists")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "Need to reconfigure table")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO, 0, 0x0612, "Invalid information to create table")
|
||||
TAOS_DEFINE_ERROR(TSDB_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_AVAIL_DISK, 0, 0x0613, "No available disk")
|
||||
|
||||
// query
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "Invalid handle")
|
||||
|
|
|
@ -207,6 +207,22 @@ static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) {
|
|||
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tsdbCreateAndOpenDFile(SDFile* pDFile) {
|
||||
if (tsdbOpenDFile(pDFile, O_WRONLY | O_CREAT | O_EXCL) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pDFile->info.size += TSDB_FILE_HEAD_SIZE;
|
||||
|
||||
if (tsdbUpdaeDFileHeader(pDFile) < 0) {
|
||||
tsdbCloseDFile(pDFile);
|
||||
remove(TSDB_FILE_FULL_NAME(pDFile));
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// =============== SDFileSet
|
||||
typedef struct {
|
||||
int fid;
|
||||
|
|
|
@ -26,14 +26,16 @@ typedef struct {
|
|||
} SRtn;
|
||||
|
||||
typedef struct {
|
||||
int version;
|
||||
uint32_t version;
|
||||
SRtn rtn; // retention snapshot
|
||||
bool isRFileSet;
|
||||
SReadH readh;
|
||||
SFSIter fsIter; // tsdb file iterator
|
||||
int niters; // memory iterators
|
||||
SCommitIter *iters;
|
||||
SDFileSet wSet; // commit file
|
||||
bool isRFileSet; // read and commit FSET
|
||||
SReadH readh;
|
||||
SDFileSet wSet;
|
||||
bool isDFileSame;
|
||||
bool isLFileSame;
|
||||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
SArray * aBlkIdx; // SBlockIdx array
|
||||
|
@ -138,7 +140,7 @@ _err:
|
|||
static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
SCommitH ch = {0};
|
||||
SCommitH commith = {0};
|
||||
SDFileSet *pSet = NULL;
|
||||
SDFileSet nSet;
|
||||
int fid;
|
||||
|
@ -146,19 +148,19 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
|||
if (pMem->numOfRows <= 0) return 0;
|
||||
|
||||
// Resource initialization
|
||||
if (tsdbInitCommitH(pRepo, &ch) < 0) {
|
||||
if (tsdbInitCommitH(&commith, pRepo) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Skip expired memory data and expired FSET
|
||||
tsdbSeekCommitIter(&ch, ch.rtn.minKey);
|
||||
tsdbSeekCommitIter(&commith, commith.rtn.minKey);
|
||||
while (true) {
|
||||
pSet = tsdbFSIterNext(&(ch.fsIter));
|
||||
if (pSet == NULL || pSet->fid >= ch.rtn.minFid) break;
|
||||
pSet = tsdbFSIterNext(&(commith.fsIter));
|
||||
if (pSet == NULL || pSet->fid >= commith.rtn.minFid) break;
|
||||
}
|
||||
|
||||
// Loop to commit to each file
|
||||
fid = tsdbNextCommitFid(&(ch));
|
||||
fid = tsdbNextCommitFid(&(commith));
|
||||
while (true) {
|
||||
// Loop over both on disk and memory
|
||||
if (pSet == NULL && fid == TSDB_IVLD_FID) break;
|
||||
|
@ -168,31 +170,32 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
|||
// existing FSET, only check if file in correct retention
|
||||
int level, id;
|
||||
|
||||
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(ch.rtn)), &level, &id);
|
||||
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(commith.rtn)), &level, &id);
|
||||
if (level == TFS_UNDECIDED_LEVEL) {
|
||||
terrno = TSDB_TDB_NO_AVAIL_DISK;
|
||||
tsdbDestroyCommitH(&ch);
|
||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (level > TSDB_FSET_LEVEL(pSet)) {
|
||||
// Need to move the FSET to higher level
|
||||
if (tsdbCopyDFileSet(*pSet, level, id, &nSet) < 0) {
|
||||
tsdbDestroyCommitH(&ch);
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateDFileSet(pRepo, &nSet) < 0) {
|
||||
tsdbDestroyCommitH(&ch);
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbUpdateDFileSet(pRepo, pSet) < 0) {
|
||||
tsdbDestroyCommitH(&ch);
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
pSet = tsdbFSIterNext(&(ch.fsIter));
|
||||
pSet = tsdbFSIterNext(&(commith.fsIter));
|
||||
} else {
|
||||
// Has memory data to commit
|
||||
SDFileSet *pCSet;
|
||||
|
@ -206,18 +209,18 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
|||
// Commit to an existing FSET
|
||||
pCSet = pSet;
|
||||
cfid = pSet->fid;
|
||||
pSet = tsdbFSIterNext(&(ch.fsIter));
|
||||
pSet = tsdbFSIterNext(&(commith.fsIter));
|
||||
}
|
||||
fid = tsdbNextCommitFid(&ch);
|
||||
fid = tsdbNextCommitFid(&commith);
|
||||
|
||||
if (tsdbCommitToFile(pCSet, &ch, cfid) < 0) {
|
||||
tsdbDestroyCommitH(&ch);
|
||||
if (tsdbCommitToFile(pCSet, &commith, cfid) < 0) {
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tsdbDestroyCommitH(&ch);
|
||||
tsdbDestroyCommitH(&commith);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -260,7 +263,6 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
|
|||
}
|
||||
|
||||
static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||
int level, id;
|
||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
|
||||
|
@ -269,45 +271,13 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
|||
tsdbResetCommitFile(pCommith);
|
||||
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey));
|
||||
|
||||
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id);
|
||||
if (level == TFS_UNDECIDED_LEVEL) {
|
||||
terrno = TSDB_TDB_NO_AVAIL_DISK;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Set commit file
|
||||
if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) {
|
||||
tsdbInitDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), REPO_ID(pRepo), fid, pCommith->version, level, id);
|
||||
} else {
|
||||
level = TSDB_FSET_LEVEL(pSet);
|
||||
id = TSDB_FSET_ID(pSet);
|
||||
|
||||
// TSDB_FILE_HEAD
|
||||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||
tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD);
|
||||
|
||||
// TSDB_FILE_DATA
|
||||
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
|
||||
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
tsdbInitDFileWithOld(pWDataf, pRDataf);
|
||||
|
||||
// TSDB_FILE_LAST
|
||||
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
|
||||
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
|
||||
if (pRLastf->info.size < 32 * 1024) {
|
||||
tsdbInitDFileWithOld(pWLastf, pRLastf);
|
||||
} else {
|
||||
tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST);
|
||||
}
|
||||
}
|
||||
|
||||
// Open commit file
|
||||
if (tsdbOpenCommitFile(pCommith, pSet) < 0) {
|
||||
// Set and open files
|
||||
if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Loop to commit each table data
|
||||
for (int tid = 0; tid < pCommith->niters; tid++) {
|
||||
for (int tid = 1; tid < pCommith->niters; tid++) {
|
||||
SCommitIter *pIter = pCommith->iters + tid;
|
||||
|
||||
if (pIter->pTable == NULL) continue;
|
||||
|
@ -354,7 +324,8 @@ static SCommitIter *tsdbCreateCommitIters(SCommitH *pCommith) {
|
|||
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||
|
||||
for (int i = 0; i < pMem->maxTables; i++) {
|
||||
if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) {
|
||||
if ((pCommith->iters[i].pTable != NULL) && (pMem->tData[i] != NULL) &&
|
||||
(TABLE_UID(pCommith->iters[i].pTable) == pMem->tData[i]->uid)) {
|
||||
if ((pCommith->iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -379,20 +350,20 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) {
|
|||
|
||||
free(pCommith->iters);
|
||||
pCommith->iters = NULL;
|
||||
pCommith->niters = 0;
|
||||
}
|
||||
|
||||
// Skip all keys until key (not included)
|
||||
static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) {
|
||||
for (int i = 0; i < pCommith->niters; i++) {
|
||||
SCommitIter *pIter = pCommith->iters + i;
|
||||
if (pIter->pTable == NULL) continue;
|
||||
if (pIter->pIter == NULL) continue;
|
||||
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
|
||||
|
||||
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key - 1, INT32_MAX, NULL, NULL, 0, true, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) {
|
||||
static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) {
|
||||
STsdbCfg *pCfg = REPO_CFG(pRepo);
|
||||
|
||||
memset(pCommith, 0, sizeof(*pCommith));
|
||||
|
@ -404,6 +375,11 @@ static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pCommith) {
|
|||
|
||||
tsdbGetRtnSnap(pRepo, &(pCommith->rtn));
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
SDFile *pDFile = TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(pCommith), ftype);
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
}
|
||||
|
||||
// Init read handle
|
||||
if (tsdbInitReadH(&(pCommith->readh), pRepo) < 0) {
|
||||
return -1;
|
||||
|
@ -489,13 +465,12 @@ static int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
|||
}
|
||||
|
||||
static int tsdbNextCommitFid(SCommitH *pCommith) {
|
||||
SCommitIter *pIter;
|
||||
STsdbRepo * pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
int fid = TSDB_IVLD_FID;
|
||||
|
||||
for (int i = 0; i < pCommith->niters; i++) {
|
||||
pIter = pCommith->iters + i;
|
||||
SCommitIter *pIter = pCommith->iters + i;
|
||||
if (pIter->pTable == NULL || pIter->pIter == NULL) continue;
|
||||
|
||||
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
|
||||
|
@ -514,7 +489,7 @@ static int tsdbNextCommitFid(SCommitH *pCommith) {
|
|||
|
||||
static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
||||
SCommitIter *pIter = pCommith->iters + tid;
|
||||
if (pIter->pTable == NULL) return 0;
|
||||
TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
|
||||
|
||||
TSDB_RLOCK_TABLE(pIter->pTable);
|
||||
|
||||
|
@ -525,6 +500,82 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) {
|
||||
// No disk data and no memory data
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return 0;
|
||||
} else {
|
||||
// Must has disk data, maybe has memory data
|
||||
int nBlocks;
|
||||
int bidx = 0;
|
||||
SBlock *pBlock;
|
||||
|
||||
if (pCommith->readh.pBlkIdx) {
|
||||
if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
|
||||
nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
|
||||
} else {
|
||||
nBlocks = 0;
|
||||
}
|
||||
|
||||
if (bidx < nBlocks) {
|
||||
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
} else {
|
||||
pBlock = NULL;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break;
|
||||
|
||||
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) ||
|
||||
(pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
|
||||
if (tsdbMoveBlock(pCommith, bidx) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
|
||||
bidx++;
|
||||
if (bidx < nBlocks) {
|
||||
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
} else {
|
||||
pBlock = NULL;
|
||||
}
|
||||
} else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
|
||||
// merge pBlock data and memory data
|
||||
if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
|
||||
bidx++;
|
||||
if (bidx < nBlocks) {
|
||||
pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
} else {
|
||||
pBlock = NULL;
|
||||
}
|
||||
nextKey = tsdbNextIterKey(pIter->pIter);
|
||||
} else {
|
||||
// Only commit memory data
|
||||
if (pBlock == NULL) {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
nextKey = tsdbNextIterKey(pIter->pIter);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
if (!pCommith->isRFileSet) {
|
||||
if (pIter->pIter == NULL) {
|
||||
// No memory data
|
||||
|
@ -607,7 +658,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
}
|
||||
nextKey = tsdbNextIterKey(pIter->pIter);
|
||||
} else {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst-1, true) < 0) {
|
||||
if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
|
@ -615,6 +666,7 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid) {
|
|||
}
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
|
||||
|
@ -635,6 +687,8 @@ static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) {
|
|||
if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) {
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
pCommith->readh.pBlkIdx = NULL;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -973,7 +1027,7 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
// Ignore the block
|
||||
ASSERT(0);
|
||||
*(pIter->pIter) = titer;
|
||||
} else if (tsdbCanAddSubBlock()) {
|
||||
} else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) {
|
||||
// Add a sub-block
|
||||
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
|
||||
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
|
||||
|
@ -1011,12 +1065,12 @@ static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
|||
}
|
||||
|
||||
static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
|
||||
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks+bidx;
|
||||
SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
SDFile *pCommitF = (pBlock->last) ? TSDB_COMMIT_LAST_FILE(pCommith) : TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
SDFile *pReadF = (pBlock->last) ? TSDB_READ_LAST_FILE(&(pCommith->readh)) : TSDB_READ_DATA_FILE(&(pCommith->readh));
|
||||
SBlock block;
|
||||
|
||||
if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) {
|
||||
if ((pBlock->last && pCommith->isLFileSame) || ((!pBlock->last) && pCommith->isDFileSame)) {
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1;
|
||||
} else {
|
||||
|
@ -1158,7 +1212,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
|
|||
}
|
||||
|
||||
static void tsdbResetCommitFile(SCommitH *pCommith) {
|
||||
tsdbResetCommitTable(pCommith);
|
||||
pCommith->isRFileSet = false;
|
||||
pCommith->isDFileSame = false;
|
||||
pCommith->isLFileSame = false;
|
||||
taosArrayClear(pCommith->aBlkIdx);
|
||||
}
|
||||
|
||||
|
@ -1168,19 +1224,111 @@ static void tsdbResetCommitTable(SCommitH *pCommith) {
|
|||
taosArrayClear(pCommith->aSupBlk);
|
||||
}
|
||||
|
||||
static int tsdbOpenCommitFile(SCommitH *pCommith, SDFileSet *pRSet) {
|
||||
if (pRSet == NULL) {
|
||||
pCommith->isRFileSet = false;
|
||||
} else {
|
||||
static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
|
||||
int level, id;
|
||||
SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith);
|
||||
|
||||
tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &level, &id);
|
||||
if (level == TFS_UNDECIDED_LEVEL) {
|
||||
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Open read FSET
|
||||
if (pSet) {
|
||||
if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pSet) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
pCommith->isRFileSet = true;
|
||||
if (tsdbSetAndOpenReadFSet(&(pCommith->readh), pRSet) < 0) {
|
||||
|
||||
if (tsdbLoadBlockIdx(&(pCommith->readh)) < 0) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
}
|
||||
return -1;
|
||||
} else {
|
||||
pCommith->isRFileSet = false;
|
||||
}
|
||||
|
||||
// Set and open commit FSET
|
||||
if (pSet == NULL || level > TSDB_FSET_LEVEL(pSet)) {
|
||||
// Create new FSET
|
||||
tsdbInitDFileSet(pWSet, TSDB_COMMIT_REPO_ID(pCommith), fid, pCommith->version, level, id);
|
||||
|
||||
if (tsdbOpenDFileSet(pWSet, O_WRONLY | O_CREAT) < 0) {
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
remove(TSDB_FILE_FULL_NAME(pWSet, ftype));
|
||||
}
|
||||
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateDFileSetHeader(pWSet) < 0) {
|
||||
tsdbCloseDFileSet(pWSet);
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
remove(TSDB_FILE_FULL_NAME(pWSet, ftype));
|
||||
}
|
||||
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
// TODO: update file info;
|
||||
} else {
|
||||
level = TSDB_FSET_LEVEL(pSet);
|
||||
id = TSDB_FSET_ID(pSet);
|
||||
|
||||
// TSDB_FILE_HEAD
|
||||
SDFile *pWHeadf = TSDB_COMMIT_HEAD_FILE(pCommith);
|
||||
tsdbInitDFile(pWHeadf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_HEAD);
|
||||
if (tsdbCreateAndOpenDFile(pWHeadf) < 0) {
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbOpenDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith), O_WRONLY | O_CREAT) < 0) {
|
||||
// TSDB_FILE_DATA
|
||||
SDFile *pRDataf = TSDB_READ_DATA_FILE(&(pCommith->readh));
|
||||
SDFile *pWDataf = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
tsdbInitDFileWithOld(pWHeadf, pRDataf);
|
||||
if (tsdbOpenDFile(pWDataf, O_WRONLY) < 0) {
|
||||
tsdbCloseDFile(pWHeadf);
|
||||
remove(TSDB_FILE_FULL_NAME(pWHeadf));
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
pCommith->isDFileSame = true;
|
||||
|
||||
// TSDB_FILE_LAST
|
||||
SDFile *pRLastf = TSDB_READ_LAST_FILE(&(pCommith->readh));
|
||||
SDFile *pWLastf = TSDB_COMMIT_LAST_FILE(pCommith);
|
||||
if (pRLastf->info.size < 32 * 1024) {
|
||||
tsdbInitDFileWithOld(pWLastf, pRLastf);
|
||||
pCommith->isLFileSame = true;
|
||||
} else {
|
||||
tsdbInitDFile(pWLastf, REPO_ID(pRepo), fid, pCommith->version, level, id, NULL, TSDB_FILE_LAST);
|
||||
pCommith->isLFileSame = false;
|
||||
}
|
||||
if (tsdbOpenDFile(pWLastf, O_WRONLY) < 0) {
|
||||
tsdbCloseDFile(pWDataf);
|
||||
tsdbCloseDFile(pWHeadf);
|
||||
remove(TSDB_FILE_FULL_NAME(pWHeadf));
|
||||
if (pCommith->isRFileSet) {
|
||||
tsdbCloseAndUnsetFSet(&(pCommith->readh));
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -1198,3 +1346,21 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) {
|
|||
}
|
||||
tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith));
|
||||
}
|
||||
|
||||
static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) {
|
||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed;
|
||||
|
||||
ASSERT(mergeRows > 0);
|
||||
|
||||
if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) {
|
||||
if (pBlock->last) {
|
||||
if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true;
|
||||
} else {
|
||||
if (mergeRows < pCfg->maxRowsPerFileBlock) return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
|
@ -82,7 +82,8 @@ static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) {
|
|||
}
|
||||
|
||||
// ============== Operations on SDFile
|
||||
void tsdbInitDFile(SDFile *pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo *pInfo, TSDB_FILE_T ftype) {
|
||||
void tsdbInitDFile(SDFile *pDFile, int vid, int fid, uint32_t ver, int level, int id, const SDFInfo *pInfo,
|
||||
TSDB_FILE_T ftype) {
|
||||
char fname[TSDB_FILENAME_LEN];
|
||||
|
||||
TSDB_FILE_SET_CLOSED(pDFile);
|
||||
|
@ -158,7 +159,7 @@ static void *tsdbDecodeDFInfo(void *buf, SDFInfo *pInfo) {
|
|||
}
|
||||
|
||||
// ============== Operations on SDFileSet
|
||||
void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, int ver, int level, int id) {
|
||||
void tsdbInitDFileSet(SDFileSet *pSet, int vid, int fid, uint32_t ver, int level, int id) {
|
||||
pSet->fid = fid;
|
||||
pSet->state = 0;
|
||||
|
||||
|
@ -201,7 +202,7 @@ int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) {
|
|||
ASSERT(tolevel > TSDB_FSET_LEVEL(&src));
|
||||
|
||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
||||
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(&src, ftype), tolevel, toid, TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
||||
while (ftype >= 0) {
|
||||
remove(TSDB_FILE_FULL_NAME(TSDB_DFILE_IN_SET(pDest, ftype)));
|
||||
ftype--;
|
||||
|
@ -214,20 +215,20 @@ int tsdbCopyDFileSet(SDFileSet src, int tolevel, int toid, SDFileSet *pDest) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbGetFilename(int vid, int fid, int64_t ver, TSDB_FILE_T ftype, char *fname) {
|
||||
static void tsdbGetFilename(int vid, int fid, uint32_t ver, TSDB_FILE_T ftype, char *fname) {
|
||||
ASSERT(ftype != TSDB_FILE_MAX);
|
||||
|
||||
if (ftype < TSDB_FILE_MAX) {
|
||||
if (ver == 0) {
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s", vid, vid, fid, TSDB_FNAME_SUFFIX[ftype]);
|
||||
} else {
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRId64, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/data/v%df%d.%s-%012" PRIu32, vid, vid, fid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
}
|
||||
} else {
|
||||
if (ver == 0) {
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/%s", vid, TSDB_FNAME_SUFFIX[ftype]);
|
||||
} else {
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRId64, vid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
snprintf(fname, "vnode/vnode%d/tsdb/%s-%012" PRIu32, vid, TSDB_FNAME_SUFFIX[ftype], ver);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue