diff --git a/include/dnode/vnode/tsdb/tsdb.h b/include/dnode/vnode/tsdb/tsdb.h index c19152de44..6751091f24 100644 --- a/include/dnode/vnode/tsdb/tsdb.h +++ b/include/dnode/vnode/tsdb/tsdb.h @@ -54,6 +54,7 @@ typedef struct STsdbCfg { int32_t keep1; int32_t keep2; int8_t update; + int8_t compression; } STsdbCfg; // STsdb diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index c16d10414e..81128ea9f3 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -62,6 +62,37 @@ static void tsdbDestroyCommitH(SCommitH *pCommith); static int tsdbCreateCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +static void tsdbResetCommitFile(SCommitH *pCommith); +static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +// static int tsdbCommitMeta(STsdbRepo *pRepo); +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); +// static void tsdbStartCommit(STsdbRepo *pRepo); +// static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); +// static int tsdbCreateCommitIters(SCommitH *pCommith); +// static void tsdbDestroyCommitIters(SCommitH *pCommith); +// static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); +// static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); +// static void tsdbDestroyCommitH(SCommitH *pCommith); +// static int tsdbGetFidLevel(int fid, SRtn *pRtn); +// static int tsdbNextCommitFid(SCommitH *pCommith); +static int tsdbCommitToTable(SCommitH *pCommith, int tid); +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); +static int tsdbComparKeyBlock(const void *arg1, const void *arg2); +// static int tsdbWriteBlockInfo(SCommitH *pCommih); +// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); +// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); +static int tsdbMoveBlock(SCommitH *pCommith, int bidx); +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); +static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, + bool isLastOneBlock); +static void tsdbResetCommitTable(SCommitH *pCommith); +static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); +// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); +// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +// TSKEY maxKey, int maxRows, int8_t update); int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; @@ -141,7 +172,6 @@ int tsdbCommit(STsdb *pRepo) { // Loop to commit to each file fid = tsdbNextCommitFid(&(commith)); -#if 0 while (true) { // Loop over both on disk and memory if (pSet == NULL && fid == TSDB_IVLD_FID) break; @@ -179,7 +209,6 @@ int tsdbCommit(STsdb *pRepo) { fid = tsdbNextCommitFid(&commith); } } -#endif tsdbDestroyCommitH(&commith); tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS); @@ -314,61 +343,62 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -// static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { -// STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); -// STsdbCfg *pCfg = REPO_CFG(pRepo); +static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); + STsdbCfg *pCfg = REPO_CFG(pRepo); -// ASSERT(pSet == NULL || pSet->fid == fid); + ASSERT(pSet == NULL || pSet->fid == fid); -// tsdbResetCommitFile(pCommith); -// tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); + tsdbResetCommitFile(pCommith); + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &(pCommith->minKey), &(pCommith->maxKey)); -// // Set and open files -// if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { -// return -1; -// } + // Set and open files + if (tsdbSetAndOpenCommitFile(pCommith, pSet, fid) < 0) { + return -1; + } -// // Loop to commit each table data -// for (int tid = 1; tid < pCommith->niters; tid++) { -// SCommitIter *pIter = pCommith->iters + tid; + // Loop to commit each table data + for (int tid = 1; tid < pCommith->niters; tid++) { + SCommitIter *pIter = pCommith->iters + tid; -// if (pIter->pTable == NULL) continue; + if (pIter->pTable == NULL) continue; -// if (tsdbCommitToTable(pCommith, tid) < 0) { -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } -// } + if (tsdbCommitToTable(pCommith, tid) < 0) { + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } + } +#if 0 -// if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) -// < -// 0) { -// tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } + if (tsdbWriteBlockIdx(TSDB_COMMIT_HEAD_FILE(pCommith), pCommith->aBlkIdx, (void **)(&(TSDB_COMMIT_BUF(pCommith)))) < + 0) { + tsdbError("vgId:%d failed to write SBlockIdx part to FSET %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } -// if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { -// tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); -// tsdbCloseCommitFile(pCommith, true); -// // revert the file change -// tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); -// return -1; -// } + if (tsdbUpdateDFileSetHeader(&(pCommith->wSet)) < 0) { + tsdbError("vgId:%d failed to update FSET %d header since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); + tsdbCloseCommitFile(pCommith, true); + // revert the file change + tsdbApplyDFileSetChange(TSDB_COMMIT_WRITE_FSET(pCommith), pSet); + return -1; + } -// // Close commit file -// tsdbCloseCommitFile(pCommith, false); + // Close commit file + tsdbCloseCommitFile(pCommith, false); -// if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { -// return -1; -// } + if (tsdbUpdateDFileSet(REPO_FS(pRepo), &(pCommith->wSet)) < 0) { + return -1; + } -// return 0; -// } +#endif + return 0; +} static int tsdbCreateCommitIters(SCommitH *pCommith) { STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); @@ -416,1033 +446,6 @@ static void tsdbDestroyCommitIters(SCommitH *pCommith) { pCommith->niters = 0; } -#if 0 -#include "tsdbint.h" - -extern int32_t tsTsdbMetaCompactRatio; - - -static int tsdbCommitMeta(STsdbRepo *pRepo); -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); -static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); -static void tsdbStartCommit(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo, int eno); -static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static int tsdbCreateCommitIters(SCommitH *pCommith); -static void tsdbDestroyCommitIters(SCommitH *pCommith); -static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); -static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); -static void tsdbDestroyCommitH(SCommitH *pCommith); -static int tsdbGetFidLevel(int fid, SRtn *pRtn); -static int tsdbNextCommitFid(SCommitH *pCommith); -static int tsdbCommitToTable(SCommitH *pCommith, int tid); -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable); -static int tsdbComparKeyBlock(const void *arg1, const void *arg2); -static int tsdbWriteBlockInfo(SCommitH *pCommih); -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData); -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx); -static int tsdbMoveBlock(SCommitH *pCommith, int bidx); -static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks); -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, - bool isLastOneBlock); -static void tsdbResetCommitFile(SCommitH *pCommith); -static void tsdbResetCommitTable(SCommitH *pCommith); -static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid); -static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError); -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo); -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update); - - -int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, - SBlockIdx *pIdx) { - size_t nSupBlocks; - size_t nSubBlocks; - uint32_t tlen; - SBlockInfo *pBlkInfo; - int64_t offset; - SBlock * pBlock; - - memset(pIdx, 0, sizeof(*pIdx)); - - nSupBlocks = taosArrayGetSize(pSupA); - nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); - - if (nSupBlocks <= 0) { - // No data (data all deleted) - return 0; - } - - tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - pBlkInfo = *ppBuf; - - pBlkInfo->delimiter = TSDB_FILE_DELIMITER; - pBlkInfo->tid = TABLE_TID(pTable); - pBlkInfo->uid = TABLE_UID(pTable); - - memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); - if (nSubBlocks > 0) { - memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); - - for (int i = 0; i < nSupBlocks; i++) { - pBlock = pBlkInfo->blocks + i; - - if (pBlock->numOfSubBlocks > 1) { - pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); - } - } - } - - taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); - - if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { - return -1; - } - - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); - - // Set pIdx - pBlock = taosArrayGetLast(pSupA); - - pIdx->tid = TABLE_TID(pTable); - pIdx->uid = TABLE_UID(pTable); - pIdx->hasLast = pBlock->last ? 1 : 0; - pIdx->maxKey = pBlock->keyLast; - pIdx->numOfBlocks = (uint32_t)nSupBlocks; - pIdx->len = tlen; - pIdx->offset = (uint32_t)offset; - - return 0; -} - -int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { - SBlockIdx *pBlkIdx; - size_t nidx = taosArrayGetSize(pIdxA); - int tlen = 0, size; - int64_t offset; - - if (nidx <= 0) { - // All data are deleted - pHeadf->info.offset = 0; - pHeadf->info.len = 0; - return 0; - } - - for (size_t i = 0; i < nidx; i++) { - pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); - - size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); - if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; - - void *ptr = POINTER_SHIFT(*ppBuf, tlen); - tsdbEncodeSBlockIdx(&ptr, pBlkIdx); - - tlen += size; - } - - tlen += sizeof(TSCKSUM); - if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; - taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); - - if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { - return -1; - } - - tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); - pHeadf->info.offset = (uint32_t)offset; - pHeadf->info.len = tlen; - - return 0; -} - - -// =================== Commit Meta Data -static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { - STsdbFS * pfs = REPO_FS(pRepo); - SMFile * pOMFile = pfs->cstatus->pmf; - SDiskID did; - - // Create/Open a meta file or open the existing file - if (pOMFile == NULL) { - // Create a new meta file - did.level = TFS_PRIMARY_LEVEL; - did.id = TFS_PRIMARY_ID; - tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); - - if (open && tsdbCreateMFile(pMf, true) < 0) { - tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - - tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); - } else { - tsdbInitMFileEx(pMf, pOMFile); - if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { - tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - } - - return 0; -} - -static int tsdbCommitMeta(STsdbRepo *pRepo) { - STsdbFS * pfs = REPO_FS(pRepo); - SMemTable *pMem = pRepo->imem; - SMFile * pOMFile = pfs->cstatus->pmf; - SMFile mf; - SActObj * pAct = NULL; - SActCont * pCont = NULL; - SListNode *pNode = NULL; - - ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); - - if (listNEles(pMem->actList) <= 0) { - // no meta data to commit, just keep the old meta file - tsdbUpdateMFile(pfs, pOMFile); - if (tsTsdbMetaCompactRatio > 0) { - if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) { - return -1; - } - int ret = tsdbCompactMetaFile(pRepo, pfs, &mf); - if (ret < 0) tsdbError("compact meta file error"); - - return ret; - } - return 0; - } else { - if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) { - return -1; - } - } - - // Loop to write - while ((pNode = tdListPopHead(pMem->actList)) != NULL) { - pAct = (SActObj *)pNode->data; - if (pAct->act == TSDB_UPDATE_META) { - pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); - if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) { - tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tsdbCloseMFile(&mf); - (void)tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } - } else if (pAct->act == TSDB_DROP_META) { - if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { - tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, - tstrerror(terrno)); - tsdbCloseMFile(&mf); - tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } - } else { - ASSERT(false); - } - } - - if (tsdbUpdateMFileHeader(&mf) < 0) { - tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); - tsdbApplyMFileChange(&mf, pOMFile); - // TODO: need to reload metaCache - return -1; - } - - TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbUpdateMFile(pfs, &mf); - - if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { - tsdbError("compact meta file error"); - } - - return 0; -} - -int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { - int tlen = 0; - tlen += taosEncodeFixedU64(buf, pRecord->uid); - tlen += taosEncodeFixedI64(buf, pRecord->offset); - tlen += taosEncodeFixedI64(buf, pRecord->size); - - return tlen; -} - -void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { - buf = taosDecodeFixedU64(buf, &(pRecord->uid)); - buf = taosDecodeFixedI64(buf, &(pRecord->offset)); - buf = taosDecodeFixedI64(buf, &(pRecord->size)); - - return buf; -} - -static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { - char buf[64] = "\0"; - void * pBuf = buf; - SKVRecord rInfo; - int64_t offset; - - // Seek to end of meta file - offset = tsdbSeekMFile(pMFile, 0, SEEK_END); - if (offset < 0) { - return -1; - } - - rInfo.offset = offset; - rInfo.uid = uid; - rInfo.size = contLen; - - int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); - if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { - return -1; - } - - if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { - return -1; - } - - tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); - - SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; - - pMFile->info.nRecords++; - - SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); - if (pRecord != NULL) { - pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); - } else { - pMFile->info.nRecords++; - } - taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); - - return 0; -} - -static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { - SKVRecord rInfo = {0}; - char buf[128] = "\0"; - - SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); - if (pRecord == NULL) { - tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); - return -1; - } - - rInfo.offset = -pRecord->offset; - rInfo.uid = pRecord->uid; - rInfo.size = pRecord->size; - - void *pBuf = buf; - tsdbEncodeKVRecord(&pBuf, &rInfo); - - if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { - return -1; - } - - pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); - pMFile->info.nDels++; - pMFile->info.nRecords--; - pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); - - taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); - return 0; -} - -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { - float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); - float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); - float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; - - if (delPercent < compactRatio && tombPercent < compactRatio) { - return 0; - } - - if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { - tsdbError("open meta file %s compact fail", pMFile->f.rname); - return -1; - } - - tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, - tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); - - SMFile mf; - SDiskID did; - - // first create tmp meta file - did.level = TFS_PRIMARY_LEVEL; - did.id = TFS_PRIMARY_ID; - tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1); - - if (tsdbCreateMFile(&mf, true) < 0) { - tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); - return -1; - } - - tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); - - // second iterator metaCache - int code = -1; - int64_t maxBufSize = 1024; - SKVRecord *pRecord; - void *pBuf = NULL; - - pBuf = malloc((size_t)maxBufSize); - if (pBuf == NULL) { - goto _err; - } - - // init Comp - assert(pfs->metaCacheComp == NULL); - pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pfs->metaCacheComp == NULL) { - goto _err; - } - - pRecord = taosHashIterate(pfs->metaCache, NULL); - while (pRecord) { - if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { - tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - goto _err; - } - if (pRecord->size > maxBufSize) { - maxBufSize = pRecord->size; - void* tmp = realloc(pBuf, (size_t)maxBufSize); - if (tmp == NULL) { - goto _err; - } - pBuf = tmp; - } - int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); - if (nread < 0) { - tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), - tstrerror(terrno)); - goto _err; - } - - if (nread < pRecord->size) { - tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", - REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); - goto _err; - } - - if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { - tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, - tstrerror(terrno)); - goto _err; - } - - pRecord = taosHashIterate(pfs->metaCache, pRecord); - } - code = 0; - -_err: - if (code == 0) TSDB_FILE_FSYNC(&mf); - tsdbCloseMFile(&mf); - tsdbCloseMFile(pMFile); - - if (code == 0) { - // rename meta.tmp -> meta - tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile)); - taosRename(mf.f.aname,pMFile->f.aname); - tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN); - tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); - // update current meta file info - pfs->nstatus->pmf = NULL; - tsdbUpdateMFile(pfs, &mf); - - taosHashCleanup(pfs->metaCache); - pfs->metaCache = pfs->metaCacheComp; - pfs->metaCacheComp = NULL; - } else { - // remove meta.tmp file - remove(mf.f.aname); - taosHashCleanup(pfs->metaCacheComp); - pfs->metaCacheComp = NULL; - } - - tfree(pBuf); - - ASSERT(mf.info.nDels == 0); - ASSERT(mf.info.tombSize == 0); - - tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, - code,mf.info.nRecords,mf.info.size); - return code; -} - -// =================== Commit Time-Series Data -#if 0 -static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { - for (int i = 0; i < nIters; i++) { - TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); - if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true; - } - return false; -} -#endif - - - - - -static int tsdbCommitToTable(SCommitH *pCommith, int tid) { - SCommitIter *pIter = pCommith->iters + tid; - TSKEY nextKey = tsdbNextIterKey(pIter->pIter); - - tsdbResetCommitTable(pCommith); - - TSDB_RLOCK_TABLE(pIter->pTable); - - // Set commit table - if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - // No disk data and no memory data, just return - if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return 0; - } - - // Must has disk data or has memory data - int nBlocks; - int bidx = 0; - SBlock *pBlock; - - if (pCommith->readh.pBlkIdx) { - if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - } else { - nBlocks = 0; - } - - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - - while (true) { - if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; - - if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || - (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { - if (tsdbMoveBlock(pCommith, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { - // merge pBlock data and memory data - if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - - bidx++; - if (bidx < nBlocks) { - pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - } else { - pBlock = NULL; - } - nextKey = tsdbNextIterKey(pIter->pIter); - } else { - // Only commit memory data - if (pBlock == NULL) { - if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } else { - if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - return -1; - } - } - nextKey = tsdbNextIterKey(pIter->pIter); - } - } - - TSDB_RUNLOCK_TABLE(pIter->pTable); - - if (tsdbWriteBlockInfo(pCommith) < 0) { - tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), - TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); - return -1; - } - - return 0; -} - -static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { - STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); - - pCommith->pTable = pTable; - - if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (pCommith->isRFileSet) { - if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { - return -1; - } - } else { - pCommith->readh.pBlkIdx = NULL; - } - return 0; -} - -static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { - TSKEY key = *(TSKEY *)arg1; - SBlock *pBlock = (SBlock *)arg2; - - if (key < pBlock->keyFirst) { - return -1; - } else if (key > pBlock->keyLast) { - return 1; - } else { - return 0; - } -} - -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, - bool isLast, bool isSuper, void **ppBuf, void **ppCBuf) { - STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlockData *pBlockData; - int64_t offset = 0; - int rowsToWrite = pDataCols->numOfRows; - - ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); - ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); - - // Make buffer space - if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { - return -1; - } - pBlockData = (SBlockData *)(*ppBuf); - - // Get # of cols not all NULL(not including key column) - int nColsNotAllNull = 0; - for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column - SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; - - if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it - continue; - } - - memset(pBlockCol, 0, sizeof(*pBlockCol)); - - pBlockCol->colId = pDataCol->colId; - pBlockCol->type = pDataCol->type; - if (tDataTypes[pDataCol->type].statisFunc) { - (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), - &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), - &(pBlockCol->numOfNull)); - } - nColsNotAllNull++; - } - - ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); - - // Compress the data if neccessary - int tcol = 0; // counter of not all NULL and written columns - uint32_t toffset = 0; - int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); - int32_t lsize = tsize; - int32_t keyLen = 0; - for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { - // All not NULL columns finish - if (ncol != 0 && tcol >= nColsNotAllNull) break; - - SDataCol * pDataCol = pDataCols->cols + ncol; - SBlockCol *pBlockCol = pBlockData->cols + tcol; - - if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; - - int32_t flen; // final length - int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); - void * tptr; - - // Make room - if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { - return -1; - } - pBlockData = (SBlockData *)(*ppBuf); - pBlockCol = pBlockData->cols + tcol; - tptr = POINTER_SHIFT(pBlockData, lsize); - - if (pCfg->compression == TWO_STAGE_COMP && - tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { - return -1; - } - - // Compress or just copy - if (pCfg->compression) { - flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, - tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, - tlen + COMP_OVERFLOW_BYTES); - } else { - flen = tlen; - memcpy(tptr, pDataCol->pData, flen); - } - - // Add checksum - ASSERT(flen > 0); - flen += sizeof(TSCKSUM); - taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); - tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); - - if (ncol != 0) { - tsdbSetBlockColOffset(pBlockCol, toffset); - pBlockCol->len = flen; - tcol++; - } else { - keyLen = flen; - } - - toffset += flen; - lsize += flen; - } - - pBlockData->delimiter = TSDB_FILE_DELIMITER; - pBlockData->uid = TABLE_UID(pTable); - pBlockData->numOfCols = nColsNotAllNull; - - taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); - tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); - - // Write the whole block to file - if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { - return -1; - } - - // Update pBlock membership vairables - pBlock->last = isLast; - pBlock->offset = offset; - pBlock->algorithm = pCfg->compression; - pBlock->numOfRows = rowsToWrite; - pBlock->len = lsize; - pBlock->keyLen = keyLen; - pBlock->numOfSubBlocks = isSuper ? 1 : 0; - pBlock->numOfCols = nColsNotAllNull; - pBlock->keyFirst = dataColsKeyFirst(pDataCols); - pBlock->keyLast = dataColsKeyLast(pDataCols); - - tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 - " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, - REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, - pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); - - return 0; -} - -static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, - bool isSuper) { - return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, - isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), - (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); -} - - -static int tsdbWriteBlockInfo(SCommitH *pCommih) { - SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); - SBlockIdx blkIdx; - STable * pTable = TSDB_COMMIT_TABLE(pCommih); - - if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void **)(&(TSDB_COMMIT_BUF(pCommih))), - &blkIdx) < 0) { - return -1; - } - - if (blkIdx.numOfBlocks == 0) { - return 0; - } - - if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - SMergeInfo mInfo; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - SDFile * pDFile; - bool isLast; - SBlock block; - - while (true) { - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, - pCfg->update, &mInfo); - - if (pCommith->pDataCols->numOfRows <= 0) break; - - if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } else { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { - return -1; - } - } - - return 0; -} - -static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; - SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - TSKEY keyLimit; - int16_t colId = 0; - SMergeInfo mInfo; - SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; - SBlock block, supBlock; - SDFile * pDFile; - - if (bidx == nBlocks - 1) { - keyLimit = pCommith->maxKey; - } else { - keyLimit = pBlock[1].keyFirst - 1; - } - - SSkipListIterator titer = *(pIter->pIter); - if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; - - tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, - pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); - - if (mInfo.nOperations == 0) { - // no new data to insert (all updates denied) - if (tsdbMoveBlock(pCommith, bidx) < 0) { - return -1; - } - *(pIter->pIter) = titer; - } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { - // Ignore the block - ASSERT(0); - *(pIter->pIter) = titer; - } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { - // Add a sub-block - tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, - pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update, - &mInfo); - if (pBlock->last) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; - - if (pBlock->numOfSubBlocks == 1) { - subBlocks[0] = *pBlock; - subBlocks[0].numOfSubBlocks = 0; - } else { - memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), - sizeof(SBlock) * pBlock->numOfSubBlocks); - } - subBlocks[pBlock->numOfSubBlocks] = block; - supBlock = *pBlock; - supBlock.keyFirst = mInfo.keyFirst; - supBlock.keyLast = mInfo.keyLast; - supBlock.numOfSubBlocks++; - supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; - supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); - - if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; - } else { - if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1; - } - - return 0; -} - -static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { - SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; - SDFile *pDFile; - SBlock block; - bool isSameFile; - - ASSERT(pBlock->numOfSubBlocks > 0); - - if (pBlock->last) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isSameFile = pCommith->isLFileSame; - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isSameFile = pCommith->isDFileSame; - } - - if (isSameFile) { - if (pBlock->numOfSubBlocks == 1) { - if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { - return -1; - } - } else { - block = *pBlock; - block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); - - if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), - pBlock->numOfSubBlocks) < 0) { - return -1; - } - } - } else { - if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; - if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; - } - - return 0; -} - -static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { - if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - SBlock block; - SDFile * pDFile; - bool isLast; - int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); - - int biter = 0; - while (true) { - tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, - pCfg->update); - - if (pCommith->pDataCols->numOfRows == 0) break; - - if (isLastOneBlock) { - if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { - pDFile = TSDB_COMMIT_LAST_FILE(pCommith); - isLast = true; - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - } else { - pDFile = TSDB_COMMIT_DATA_FILE(pCommith); - isLast = false; - } - - if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; - if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; - } - - return 0; -} - -static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, - TSKEY maxKey, int maxRows, int8_t update) { - TSKEY key1 = INT64_MAX; - TSKEY key2 = INT64_MAX; - STSchema *pSchema = NULL; - - ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); - tdResetDataCols(pTarget); - - while (true) { - key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); - SMemRow row = tsdbNextIterRow(pCommitIter->pIter); - if (row == NULL || memRowKey(row) > maxKey) { - key2 = INT64_MAX; - } else { - key2 = memRowKey(row); - } - - if (key1 == INT64_MAX && key2 == INT64_MAX) break; - - if (key1 < key2) { - for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - - pTarget->numOfRows++; - (*iter)++; - } else if (key1 > key2) { - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, true); - - tSkipListIterNext(pCommitIter->pIter); - } else { - if (update != TD_ROW_OVERWRITE_UPDATE) { - //copy disk data - for (int i = 0; i < pDataCols->numOfCols; i++) { - //TODO: dataColAppendVal may fail - dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, - pTarget->maxPoints); - } - - if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; - } - if (update != TD_ROW_DISCARD_UPDATE) { - //copy mem data - if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { - pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); - ASSERT(pSchema != NULL); - } - - tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); - } - (*iter)++; - tSkipListIterNext(pCommitIter->pIter); - } - - if (pTarget->numOfRows >= maxRows) break; - } -} - static void tsdbResetCommitFile(SCommitH *pCommith) { pCommith->isRFileSet = false; pCommith->isDFileSame = false; @@ -1450,15 +453,9 @@ static void tsdbResetCommitFile(SCommitH *pCommith) { taosArrayClear(pCommith->aBlkIdx); } -static void tsdbResetCommitTable(SCommitH *pCommith) { - taosArrayClear(pCommith->aSubBlk); - taosArrayClear(pCommith->aSupBlk); - pCommith->pTable = NULL; -} - static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { SDiskID did; - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb * pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); @@ -1581,6 +578,990 @@ static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid return 0; } +// extern int32_t tsTsdbMetaCompactRatio; + +// int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, +// SBlockIdx *pIdx) { +// size_t nSupBlocks; +// size_t nSubBlocks; +// uint32_t tlen; +// SBlockInfo *pBlkInfo; +// int64_t offset; +// SBlock * pBlock; + +// memset(pIdx, 0, sizeof(*pIdx)); + +// nSupBlocks = taosArrayGetSize(pSupA); +// nSubBlocks = (pSubA == NULL) ? 0 : taosArrayGetSize(pSubA); + +// if (nSupBlocks <= 0) { +// // No data (data all deleted) +// return 0; +// } + +// tlen = (uint32_t)(sizeof(SBlockInfo) + sizeof(SBlock) * (nSupBlocks + nSubBlocks) + sizeof(TSCKSUM)); +// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; +// pBlkInfo = *ppBuf; + +// pBlkInfo->delimiter = TSDB_FILE_DELIMITER; +// pBlkInfo->tid = TABLE_TID(pTable); +// pBlkInfo->uid = TABLE_UID(pTable); + +// memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock)); +// if (nSubBlocks > 0) { +// memcpy((void *)(pBlkInfo->blocks + nSupBlocks), taosArrayGet(pSubA, 0), nSubBlocks * sizeof(SBlock)); + +// for (int i = 0; i < nSupBlocks; i++) { +// pBlock = pBlkInfo->blocks + i; + +// if (pBlock->numOfSubBlocks > 1) { +// pBlock->offset += (sizeof(SBlockInfo) + sizeof(SBlock) * nSupBlocks); +// } +// } +// } + +// taosCalcChecksumAppend(0, (uint8_t *)pBlkInfo, tlen); + +// if (tsdbAppendDFile(pHeadf, (void *)pBlkInfo, tlen, &offset) < 0) { +// return -1; +// } + +// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(pBlkInfo, tlen - sizeof(TSCKSUM))); + +// // Set pIdx +// pBlock = taosArrayGetLast(pSupA); + +// pIdx->tid = TABLE_TID(pTable); +// pIdx->uid = TABLE_UID(pTable); +// pIdx->hasLast = pBlock->last ? 1 : 0; +// pIdx->maxKey = pBlock->keyLast; +// pIdx->numOfBlocks = (uint32_t)nSupBlocks; +// pIdx->len = tlen; +// pIdx->offset = (uint32_t)offset; + +// return 0; +// } + +// int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { +// SBlockIdx *pBlkIdx; +// size_t nidx = taosArrayGetSize(pIdxA); +// int tlen = 0, size; +// int64_t offset; + +// if (nidx <= 0) { +// // All data are deleted +// pHeadf->info.offset = 0; +// pHeadf->info.len = 0; +// return 0; +// } + +// for (size_t i = 0; i < nidx; i++) { +// pBlkIdx = (SBlockIdx *)taosArrayGet(pIdxA, i); + +// size = tsdbEncodeSBlockIdx(NULL, pBlkIdx); +// if (tsdbMakeRoom(ppBuf, tlen + size) < 0) return -1; + +// void *ptr = POINTER_SHIFT(*ppBuf, tlen); +// tsdbEncodeSBlockIdx(&ptr, pBlkIdx); + +// tlen += size; +// } + +// tlen += sizeof(TSCKSUM); +// if (tsdbMakeRoom(ppBuf, tlen) < 0) return -1; +// taosCalcChecksumAppend(0, (uint8_t *)(*ppBuf), tlen); + +// if (tsdbAppendDFile(pHeadf, *ppBuf, tlen, &offset) < tlen) { +// return -1; +// } + +// tsdbUpdateDFileMagic(pHeadf, POINTER_SHIFT(*ppBuf, tlen - sizeof(TSCKSUM))); +// pHeadf->info.offset = (uint32_t)offset; +// pHeadf->info.len = tlen; + +// return 0; +// } + +// // =================== Commit Meta Data +// static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMFile * pOMFile = pfs->cstatus->pmf; +// SDiskID did; + +// // Create/Open a meta file or open the existing file +// if (pOMFile == NULL) { +// // Create a new meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(pMf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo))); + +// if (open && tsdbCreateMFile(pMf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } + +// tsdbInfo("vgId:%d meta file %s is created to commit", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMf)); +// } else { +// tsdbInitMFileEx(pMf, pOMFile); +// if (open && tsdbOpenMFile(pMf, O_WRONLY) < 0) { +// tsdbError("vgId:%d failed to open META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } +// } + +// return 0; +// } + +// static int tsdbCommitMeta(STsdbRepo *pRepo) { +// STsdbFS * pfs = REPO_FS(pRepo); +// SMemTable *pMem = pRepo->imem; +// SMFile * pOMFile = pfs->cstatus->pmf; +// SMFile mf; +// SActObj * pAct = NULL; +// SActCont * pCont = NULL; +// SListNode *pNode = NULL; + +// ASSERT(pOMFile != NULL || listNEles(pMem->actList) > 0); + +// if (listNEles(pMem->actList) <= 0) { +// // no meta data to commit, just keep the old meta file +// tsdbUpdateMFile(pfs, pOMFile); +// if (tsTsdbMetaCompactRatio > 0) { +// if (tsdbInitCommitMetaFile(pRepo, &mf, false) < 0) { +// return -1; +// } +// int ret = tsdbCompactMetaFile(pRepo, pfs, &mf); +// if (ret < 0) tsdbError("compact meta file error"); + +// return ret; +// } +// return 0; +// } else { +// if (tsdbInitCommitMetaFile(pRepo, &mf, true) < 0) { +// return -1; +// } +// } + +// // Loop to write +// while ((pNode = tdListPopHead(pMem->actList)) != NULL) { +// pAct = (SActObj *)pNode->data; +// if (pAct->act == TSDB_UPDATE_META) { +// pCont = (SActCont *)POINTER_SHIFT(pAct, sizeof(SActObj)); +// if (tsdbUpdateMetaRecord(pfs, &mf, pAct->uid, (void *)(pCont->cont), pCont->len, false) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// (void)tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else if (pAct->act == TSDB_DROP_META) { +// if (tsdbDropMetaRecord(pfs, &mf, pAct->uid) < 0) { +// tsdbError("vgId:%d failed to drop META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid, +// tstrerror(terrno)); +// tsdbCloseMFile(&mf); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } +// } else { +// ASSERT(false); +// } +// } + +// if (tsdbUpdateMFileHeader(&mf) < 0) { +// tsdbError("vgId:%d failed to update META file header since %s, revert it", REPO_ID(pRepo), tstrerror(terrno)); +// tsdbApplyMFileChange(&mf, pOMFile); +// // TODO: need to reload metaCache +// return -1; +// } + +// TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbUpdateMFile(pfs, &mf); + +// if (tsTsdbMetaCompactRatio > 0 && tsdbCompactMetaFile(pRepo, pfs, &mf) < 0) { +// tsdbError("compact meta file error"); +// } + +// return 0; +// } + +// int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord) { +// int tlen = 0; +// tlen += taosEncodeFixedU64(buf, pRecord->uid); +// tlen += taosEncodeFixedI64(buf, pRecord->offset); +// tlen += taosEncodeFixedI64(buf, pRecord->size); + +// return tlen; +// } + +// void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { +// buf = taosDecodeFixedU64(buf, &(pRecord->uid)); +// buf = taosDecodeFixedI64(buf, &(pRecord->offset)); +// buf = taosDecodeFixedI64(buf, &(pRecord->size)); + +// return buf; +// } + +// static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact) { +// char buf[64] = "\0"; +// void * pBuf = buf; +// SKVRecord rInfo; +// int64_t offset; + +// // Seek to end of meta file +// offset = tsdbSeekMFile(pMFile, 0, SEEK_END); +// if (offset < 0) { +// return -1; +// } + +// rInfo.offset = offset; +// rInfo.uid = uid; +// rInfo.size = contLen; + +// int tlen = tsdbEncodeKVRecord((void **)(&pBuf), &rInfo); +// if (tsdbAppendMFile(pMFile, buf, tlen, NULL) < tlen) { +// return -1; +// } + +// if (tsdbAppendMFile(pMFile, cont, contLen, NULL) < contLen) { +// return -1; +// } + +// tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); + +// SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; + +// pMFile->info.nRecords++; + +// SKVRecord *pRecord = taosHashGet(cache, (void *)&uid, sizeof(uid)); +// if (pRecord != NULL) { +// pMFile->info.tombSize += (pRecord->size + sizeof(SKVRecord)); +// } else { +// pMFile->info.nRecords++; +// } +// taosHashPut(cache, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo)); + +// return 0; +// } + +// static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { +// SKVRecord rInfo = {0}; +// char buf[128] = "\0"; + +// SKVRecord *pRecord = taosHashGet(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// if (pRecord == NULL) { +// tsdbError("failed to drop META record with key %" PRIu64 " since not find", uid); +// return -1; +// } + +// rInfo.offset = -pRecord->offset; +// rInfo.uid = pRecord->uid; +// rInfo.size = pRecord->size; + +// void *pBuf = buf; +// tsdbEncodeKVRecord(&pBuf, &rInfo); + +// if (tsdbAppendMFile(pMFile, buf, sizeof(SKVRecord), NULL) < 0) { +// return -1; +// } + +// pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t *)buf, sizeof(SKVRecord)); +// pMFile->info.nDels++; +// pMFile->info.nRecords--; +// pMFile->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2); + +// taosHashRemove(pfs->metaCache, (void *)(&uid), sizeof(uid)); +// return 0; +// } + +// static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { +// float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); +// float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); +// float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; + +// if (delPercent < compactRatio && tombPercent < compactRatio) { +// return 0; +// } + +// if (tsdbOpenMFile(pMFile, O_RDONLY) < 0) { +// tsdbError("open meta file %s compact fail", pMFile->f.rname); +// return -1; +// } + +// tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 +// ",size:%" PRId64, +// tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); + +// SMFile mf; +// SDiskID did; + +// // first create tmp meta file +// did.level = TFS_PRIMARY_LEVEL; +// did.id = TFS_PRIMARY_ID; +// tsdbInitMFile(&mf, did, REPO_ID(pRepo), FS_TXN_VERSION(REPO_FS(pRepo)) + 1); + +// if (tsdbCreateMFile(&mf, true) < 0) { +// tsdbError("vgId:%d failed to create META file since %s", REPO_ID(pRepo), tstrerror(terrno)); +// return -1; +// } + +// tsdbInfo("vgId:%d meta file %s is created to compact meta data", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf)); + +// // second iterator metaCache +// int code = -1; +// int64_t maxBufSize = 1024; +// SKVRecord *pRecord; +// void *pBuf = NULL; + +// pBuf = malloc((size_t)maxBufSize); +// if (pBuf == NULL) { +// goto _err; +// } + +// // init Comp +// assert(pfs->metaCacheComp == NULL); +// pfs->metaCacheComp = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); +// if (pfs->metaCacheComp == NULL) { +// goto _err; +// } + +// pRecord = taosHashIterate(pfs->metaCache, NULL); +// while (pRecord) { +// if (tsdbSeekMFile(pMFile, pRecord->offset + sizeof(SKVRecord), SEEK_SET) < 0) { +// tsdbError("vgId:%d failed to seek file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } +// if (pRecord->size > maxBufSize) { +// maxBufSize = pRecord->size; +// void* tmp = realloc(pBuf, (size_t)maxBufSize); +// if (tmp == NULL) { +// goto _err; +// } +// pBuf = tmp; +// } +// int nread = (int)tsdbReadMFile(pMFile, pBuf, pRecord->size); +// if (nread < 0) { +// tsdbError("vgId:%d failed to read file %s since %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), +// tstrerror(terrno)); +// goto _err; +// } + +// if (nread < pRecord->size) { +// tsdbError("vgId:%d failed to read file %s since file corrupted, expected read:%" PRId64 " actual read:%d", +// REPO_ID(pRepo), TSDB_FILE_FULL_NAME(pMFile), pRecord->size, nread); +// goto _err; +// } + +// if (tsdbUpdateMetaRecord(pfs, &mf, pRecord->uid, pBuf, (int)pRecord->size, true) < 0) { +// tsdbError("vgId:%d failed to update META record, uid %" PRIu64 " since %s", REPO_ID(pRepo), pRecord->uid, +// tstrerror(terrno)); +// goto _err; +// } + +// pRecord = taosHashIterate(pfs->metaCache, pRecord); +// } +// code = 0; + +// _err: +// if (code == 0) TSDB_FILE_FSYNC(&mf); +// tsdbCloseMFile(&mf); +// tsdbCloseMFile(pMFile); + +// if (code == 0) { +// // rename meta.tmp -> meta +// tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), +// TSDB_FILE_FULL_NAME(pMFile)); taosRename(mf.f.aname,pMFile->f.aname); tstrncpy(mf.f.aname, pMFile->f.aname, +// TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); +// // update current meta file info +// pfs->nstatus->pmf = NULL; +// tsdbUpdateMFile(pfs, &mf); + +// taosHashCleanup(pfs->metaCache); +// pfs->metaCache = pfs->metaCacheComp; +// pfs->metaCacheComp = NULL; +// } else { +// // remove meta.tmp file +// remove(mf.f.aname); +// taosHashCleanup(pfs->metaCacheComp); +// pfs->metaCacheComp = NULL; +// } + +// tfree(pBuf); + +// ASSERT(mf.info.nDels == 0); +// ASSERT(mf.info.tombSize == 0); + +// tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, +// code,mf.info.nRecords,mf.info.size); +// return code; +// } + +// // =================== Commit Time-Series Data +// #if 0 +// static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey) { +// for (int i = 0; i < nIters; i++) { +// TSKEY nextKey = tsdbNextIterKey((iters + i)->pIter); +// if (nextKey != TSDB_DATA_TIMESTAMP_NULL && (nextKey >= minKey && nextKey <= maxKey)) return true; +// } +// return false; +// } +// #endif + +static int tsdbCommitToTable(SCommitH *pCommith, int tid) { + SCommitIter *pIter = pCommith->iters + tid; + TSKEY nextKey = tsdbNextIterKey(pIter->pIter); + + tsdbResetCommitTable(pCommith); + + // Set commit table + if (tsdbSetCommitTable(pCommith, pIter->pTable) < 0) { + return -1; + } + + // No disk data and no memory data, just return + if (pCommith->readh.pBlkIdx == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) { + return 0; + } + + // Must has disk data or has memory data + int nBlocks; + int bidx = 0; + SBlock *pBlock; + + if (pCommith->readh.pBlkIdx) { + if (tsdbLoadBlockInfo(&(pCommith->readh), NULL) < 0) { + return -1; + } + + nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; + } else { + nBlocks = 0; + } + + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + + while (true) { + if (pBlock == NULL && (nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey)) break; + + if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pCommith->maxKey) || + (pBlock && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) { + if (tsdbMoveBlock(pCommith, bidx) < 0) { + return -1; + } + + bidx++; + if (bidx < nBlocks) { + pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + } else { + pBlock = NULL; + } + } else if (pBlock && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) { + // // merge pBlock data and memory data + // if (tsdbMergeMemData(pCommith, pIter, bidx) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + + // bidx++; + // if (bidx < nBlocks) { + // pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + // } else { + // pBlock = NULL; + // } + // nextKey = tsdbNextIterKey(pIter->pIter); + } else { + // // Only commit memory data + // if (pBlock == NULL) { + // if (tsdbCommitMemData(pCommith, pIter, pCommith->maxKey, false) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + // } else { + // if (tsdbCommitMemData(pCommith, pIter, pBlock->keyFirst - 1, true) < 0) { + // TSDB_RUNLOCK_TABLE(pIter->pTable); + // return -1; + // } + // } + // nextKey = tsdbNextIterKey(pIter->pIter); + } + } + + // if (tsdbWriteBlockInfo(pCommith) < 0) { + // tsdbError("vgId:%d failed to write SBlockInfo part into file %s since %s", TSDB_COMMIT_REPO_ID(pCommith), + // TSDB_FILE_FULL_NAME(TSDB_COMMIT_HEAD_FILE(pCommith)), tstrerror(terrno)); + // return -1; + // } + + return 0; +} + +static int tsdbSetCommitTable(SCommitH *pCommith, STable *pTable) { + STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1); + + pCommith->pTable = pTable; + + if (tdInitDataCols(pCommith->pDataCols, pSchema) < 0) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pCommith->isRFileSet) { + if (tsdbSetReadTable(&(pCommith->readh), pTable) < 0) { + return -1; + } + } else { + pCommith->readh.pBlkIdx = NULL; + } + return 0; +} + +static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { + TSKEY key = *(TSKEY *)arg1; + SBlock *pBlock = (SBlock *)arg2; + + if (key < pBlock->keyFirst) { + return -1; + } else if (key > pBlock->keyLast) { + return 1; + } else { + return 0; + } +} + +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper, void **ppBuf, void **ppCBuf) { + STsdbCfg * pCfg = REPO_CFG(pRepo); + SBlockData *pBlockData; + int64_t offset = 0; + int rowsToWrite = pDataCols->numOfRows; + + ASSERT(rowsToWrite > 0 && rowsToWrite <= pCfg->maxRowsPerFileBlock); + ASSERT((!isLast) || rowsToWrite < pCfg->minRowsPerFileBlock); + + // Make buffer space + if (tsdbMakeRoom(ppBuf, TSDB_BLOCK_STATIS_SIZE(pDataCols->numOfCols)) < 0) { + return -1; + } + pBlockData = (SBlockData *)(*ppBuf); + + // Get # of cols not all NULL(not including key column) + int nColsNotAllNull = 0; + for (int ncol = 1; ncol < pDataCols->numOfCols; ncol++) { // ncol from 1, we skip the timestamp column + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + nColsNotAllNull; + + if (isAllRowsNull(pDataCol)) { // all data to commit are NULL, just ignore it + continue; + } + + memset(pBlockCol, 0, sizeof(*pBlockCol)); + + pBlockCol->colId = pDataCol->colId; + pBlockCol->type = pDataCol->type; + if (tDataTypes[pDataCol->type].statisFunc) { + (*tDataTypes[pDataCol->type].statisFunc)(pDataCol->pData, rowsToWrite, &(pBlockCol->min), &(pBlockCol->max), + &(pBlockCol->sum), &(pBlockCol->minIndex), &(pBlockCol->maxIndex), + &(pBlockCol->numOfNull)); + } + nColsNotAllNull++; + } + + ASSERT(nColsNotAllNull >= 0 && nColsNotAllNull <= pDataCols->numOfCols); + + // Compress the data if neccessary + int tcol = 0; // counter of not all NULL and written columns + uint32_t toffset = 0; + int32_t tsize = TSDB_BLOCK_STATIS_SIZE(nColsNotAllNull); + int32_t lsize = tsize; + int32_t keyLen = 0; + for (int ncol = 0; ncol < pDataCols->numOfCols; ncol++) { + // All not NULL columns finish + if (ncol != 0 && tcol >= nColsNotAllNull) break; + + SDataCol * pDataCol = pDataCols->cols + ncol; + SBlockCol *pBlockCol = pBlockData->cols + tcol; + + if (ncol != 0 && (pDataCol->colId != pBlockCol->colId)) continue; + + int32_t flen; // final length + int32_t tlen = dataColGetNEleLen(pDataCol, rowsToWrite); + void * tptr; + + // Make room + if (tsdbMakeRoom(ppBuf, lsize + tlen + COMP_OVERFLOW_BYTES + sizeof(TSCKSUM)) < 0) { + return -1; + } + pBlockData = (SBlockData *)(*ppBuf); + pBlockCol = pBlockData->cols + tcol; + tptr = POINTER_SHIFT(pBlockData, lsize); + + if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) { + return -1; + } + + // Compress or just copy + if (pCfg->compression) { + flen = (*(tDataTypes[pDataCol->type].compFunc))((char *)pDataCol->pData, tlen, rowsToWrite, tptr, + tlen + COMP_OVERFLOW_BYTES, pCfg->compression, *ppCBuf, + tlen + COMP_OVERFLOW_BYTES); + } else { + flen = tlen; + memcpy(tptr, pDataCol->pData, flen); + } + + // Add checksum + ASSERT(flen > 0); + flen += sizeof(TSCKSUM); + taosCalcChecksumAppend(0, (uint8_t *)tptr, flen); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(tptr, flen - sizeof(TSCKSUM))); + + if (ncol != 0) { + tsdbSetBlockColOffset(pBlockCol, toffset); + pBlockCol->len = flen; + tcol++; + } else { + keyLen = flen; + } + + toffset += flen; + lsize += flen; + } + + pBlockData->delimiter = TSDB_FILE_DELIMITER; + pBlockData->uid = TABLE_UID(pTable); + pBlockData->numOfCols = nColsNotAllNull; + + taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize); + tsdbUpdateDFileMagic(pDFile, POINTER_SHIFT(pBlockData, tsize - sizeof(TSCKSUM))); + + // Write the whole block to file + if (tsdbAppendDFile(pDFile, (void *)pBlockData, lsize, &offset) < lsize) { + return -1; + } + + // Update pBlock membership vairables + pBlock->last = isLast; + pBlock->offset = offset; + pBlock->algorithm = pCfg->compression; + pBlock->numOfRows = rowsToWrite; + pBlock->len = lsize; + pBlock->keyLen = keyLen; + pBlock->numOfSubBlocks = isSuper ? 1 : 0; + pBlock->numOfCols = nColsNotAllNull; + pBlock->keyFirst = dataColsKeyFirst(pDataCols); + pBlock->keyLast = dataColsKeyLast(pDataCols); + + tsdbDebug("vgId:%d tid:%d a block of data is written to file %s, offset %" PRId64 + " numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64, + REPO_ID(pRepo), TABLE_TID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len, + pBlock->numOfCols, pBlock->keyFirst, pBlock->keyLast); + + return 0; +} + +static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock, bool isLast, + bool isSuper) { + return tsdbWriteBlockImpl(TSDB_COMMIT_REPO(pCommith), TSDB_COMMIT_TABLE(pCommith), pDFile, pDataCols, pBlock, isLast, + isSuper, (void **)(&(TSDB_COMMIT_BUF(pCommith))), + (void **)(&(TSDB_COMMIT_COMP_BUF(pCommith)))); +} + +// static int tsdbWriteBlockInfo(SCommitH *pCommih) { +// SDFile * pHeadf = TSDB_COMMIT_HEAD_FILE(pCommih); +// SBlockIdx blkIdx; +// STable * pTable = TSDB_COMMIT_TABLE(pCommih); + +// if (tsdbWriteBlockInfoImpl(pHeadf, pTable, pCommih->aSupBlk, pCommih->aSubBlk, (void +// **)(&(TSDB_COMMIT_BUF(pCommih))), +// &blkIdx) < 0) { +// return -1; +// } + +// if (blkIdx.numOfBlocks == 0) { +// return 0; +// } + +// if (taosArrayPush(pCommih->aBlkIdx, (void *)(&blkIdx)) == NULL) { +// terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; +// return -1; +// } + +// return 0; +// } + +// static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// SMergeInfo mInfo; +// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); +// SDFile * pDFile; +// bool isLast; +// SBlock block; + +// while (true) { +// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, defaultRows, pCommith->pDataCols, NULL, 0, +// pCfg->update, &mInfo); + +// if (pCommith->pDataCols->numOfRows <= 0) break; + +// if (toData || pCommith->pDataCols->numOfRows >= pCfg->minRowsPerFileBlock) { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } else { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// isLast = true; +// } + +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; + +// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) { +// return -1; +// } +// } + +// return 0; +// } + +// static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; +// SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; +// TSKEY keyLimit; +// int16_t colId = 0; +// SMergeInfo mInfo; +// SBlock subBlocks[TSDB_MAX_SUBBLOCKS]; +// SBlock block, supBlock; +// SDFile * pDFile; + +// if (bidx == nBlocks - 1) { +// keyLimit = pCommith->maxKey; +// } else { +// keyLimit = pBlock[1].keyFirst - 1; +// } + +// SSkipListIterator titer = *(pIter->pIter); +// if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1; + +// tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData, +// pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo); + +// if (mInfo.nOperations == 0) { +// // no new data to insert (all updates denied) +// if (tsdbMoveBlock(pCommith, bidx) < 0) { +// return -1; +// } +// *(pIter->pIter) = titer; +// } else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) { +// // Ignore the block +// ASSERT(0); +// *(pIter->pIter) = titer; +// } else if (tsdbCanAddSubBlock(pCommith, pBlock, &mInfo)) { +// // Add a sub-block +// tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols, +// pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, +// pCfg->update, &mInfo); +// if (pBlock->last) { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// } + +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1; + +// if (pBlock->numOfSubBlocks == 1) { +// subBlocks[0] = *pBlock; +// subBlocks[0].numOfSubBlocks = 0; +// } else { +// memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), +// sizeof(SBlock) * pBlock->numOfSubBlocks); +// } +// subBlocks[pBlock->numOfSubBlocks] = block; +// supBlock = *pBlock; +// supBlock.keyFirst = mInfo.keyFirst; +// supBlock.keyLast = mInfo.keyLast; +// supBlock.numOfSubBlocks++; +// supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed; +// supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock); + +// if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, supBlock.numOfSubBlocks) < 0) return -1; +// } else { +// if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; +// if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return +// -1; +// } + +// return 0; +// } + +static int tsdbMoveBlock(SCommitH *pCommith, int bidx) { + SBlock *pBlock = pCommith->readh.pBlkInfo->blocks + bidx; + SDFile *pDFile; + SBlock block; + bool isSameFile; + + ASSERT(pBlock->numOfSubBlocks > 0); + + if (pBlock->last) { + pDFile = TSDB_COMMIT_LAST_FILE(pCommith); + isSameFile = pCommith->isLFileSame; + } else { + pDFile = TSDB_COMMIT_DATA_FILE(pCommith); + isSameFile = pCommith->isDFileSame; + } + + if (isSameFile) { + if (pBlock->numOfSubBlocks == 1) { + if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) { + return -1; + } + } else { + block = *pBlock; + block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSubBlk); + + if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), + pBlock->numOfSubBlocks) < 0) { + return -1; + } + } + } else { + if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1; + if (tsdbWriteBlock(pCommith, pDFile, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1; + if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; + } + + return 0; +} + +static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) { + if (taosArrayPush(pCommith->aSupBlk, pSupBlock) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + if (pSubBlocks && taosArrayAddBatch(pCommith->aSubBlk, pSubBlocks, nSubBlocks) == NULL) { + terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +// static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool +// isLastOneBlock) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// SBlock block; +// SDFile * pDFile; +// bool isLast; +// int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); + +// int biter = 0; +// while (true) { +// tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows, +// pCfg->update); + +// if (pCommith->pDataCols->numOfRows == 0) break; + +// if (isLastOneBlock) { +// if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) { +// pDFile = TSDB_COMMIT_LAST_FILE(pCommith); +// isLast = true; +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } +// } else { +// pDFile = TSDB_COMMIT_DATA_FILE(pCommith); +// isLast = false; +// } + +// if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1; +// if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1; +// } + +// return 0; +// } + +// static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, +// TSKEY maxKey, int maxRows, int8_t update) { +// TSKEY key1 = INT64_MAX; +// TSKEY key2 = INT64_MAX; +// STSchema *pSchema = NULL; + +// ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey); +// tdResetDataCols(pTarget); + +// while (true) { +// key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter); +// SMemRow row = tsdbNextIterRow(pCommitIter->pIter); +// if (row == NULL || memRowKey(row) > maxKey) { +// key2 = INT64_MAX; +// } else { +// key2 = memRowKey(row); +// } + +// if (key1 == INT64_MAX && key2 == INT64_MAX) break; + +// if (key1 < key2) { +// for (int i = 0; i < pDataCols->numOfCols; i++) { +// //TODO: dataColAppendVal may fail +// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, +// pTarget->maxPoints); +// } + +// pTarget->numOfRows++; +// (*iter)++; +// } else if (key1 > key2) { +// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { +// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); +// ASSERT(pSchema != NULL); +// } + +// tdAppendMemRowToDataCol(row, pSchema, pTarget, true); + +// tSkipListIterNext(pCommitIter->pIter); +// } else { +// if (update != TD_ROW_OVERWRITE_UPDATE) { +// //copy disk data +// for (int i = 0; i < pDataCols->numOfCols; i++) { +// //TODO: dataColAppendVal may fail +// dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, +// pTarget->maxPoints); +// } + +// if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; +// } +// if (update != TD_ROW_DISCARD_UPDATE) { +// //copy mem data +// if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { +// pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row)); +// ASSERT(pSchema != NULL); +// } + +// tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE); +// } +// (*iter)++; +// tSkipListIterNext(pCommitIter->pIter); +// } + +// if (pTarget->numOfRows >= maxRows) break; +// } +// } + +static void tsdbResetCommitTable(SCommitH *pCommith) { + taosArrayClear(pCommith->aSubBlk); + taosArrayClear(pCommith->aSupBlk); + pCommith->pTable = NULL; +} + static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { if (pCommith->isRFileSet) { tsdbCloseAndUnsetFSet(&(pCommith->readh)); @@ -1592,46 +1573,45 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { tsdbCloseDFileSet(TSDB_COMMIT_WRITE_FSET(pCommith)); } -static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); - STsdbCfg * pCfg = REPO_CFG(pRepo); - int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; +// static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { +// STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); +// STsdbCfg * pCfg = REPO_CFG(pRepo); +// int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; - ASSERT(mergeRows > 0); +// ASSERT(mergeRows > 0); - if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { - if (pBlock->last) { - if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; - } else { - if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; - } - } +// if (pBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS && pInfo->nOperations <= pCfg->maxRowsPerFileBlock) { +// if (pBlock->last) { +// if (pCommith->isLFileSame && mergeRows < pCfg->minRowsPerFileBlock) return true; +// } else { +// if (pCommith->isDFileSame && mergeRows <= pCfg->maxRowsPerFileBlock) return true; +// } +// } - return false; -} +// return false; +// } -int tsdbApplyRtn(STsdbRepo *pRepo) { - SRtn rtn; - SFSIter fsiter; - STsdbFS * pfs = REPO_FS(pRepo); - SDFileSet *pSet; +// int tsdbApplyRtn(STsdbRepo *pRepo) { +// SRtn rtn; +// SFSIter fsiter; +// STsdbFS * pfs = REPO_FS(pRepo); +// SDFileSet *pSet; - // Get retention snapshot - tsdbGetRtnSnap(pRepo, &rtn); +// // Get retention snapshot +// tsdbGetRtnSnap(pRepo, &rtn); - tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); - while ((pSet = tsdbFSIterNext(&fsiter))) { - if (pSet->fid < rtn.minFid) { - tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, - TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); - continue; - } +// tsdbFSIterInit(&fsiter, pfs, TSDB_FS_ITER_FORWARD); +// while ((pSet = tsdbFSIterNext(&fsiter))) { +// if (pSet->fid < rtn.minFid) { +// tsdbInfo("vgId:%d FSET %d at level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid, +// TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet)); +// continue; +// } - if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { - return -1; - } - } +// if (tsdbApplyRtnOnFSet(pRepo, pSet, &rtn) < 0) { +// return -1; +// } +// } - return 0; -} -#endif \ No newline at end of file +// return 0; +// } \ No newline at end of file