From 5d54e8554032ae4cc7eb72cfdf13f210c1db111c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Sun, 3 Jan 2021 23:40:47 +0800 Subject: [PATCH] partial work --- src/tsdb/inc/tsdbMain.h | 48 ++++- src/tsdb/src/tsdbCommit.c | 193 +++++++----------- src/tsdb/src/tsdbFS.c | 50 ++--- .../src/tkvstore.c => tsdb/src/tsdbStore.c} | 14 +- src/util/inc/tkvstore.h | 63 ------ 5 files changed, 142 insertions(+), 226 deletions(-) rename src/{util/src/tkvstore.c => tsdb/src/tsdbStore.c} (97%) delete mode 100644 src/util/inc/tkvstore.h diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index c57315acd8..4658def1bf 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -365,14 +365,17 @@ typedef struct { #define TSDB_FILE_F(tf) (&((tf)->f))) #define TSDB_FILE_FD(tf) ((tf)->fd) -int tsdbOpenFS(STsdbRepo *pRepo); -void tsdbCloseFS(STsdbRepo *pRepo); -int tsdbFSNewTxn(STsdbRepo *pRepo); -int tsdbFSEndTxn(STsdbRepo *pRepo, bool hasError); -int tsdbUpdateMFile(STsdbRepo *pRepo, SMFile *pMFile); -int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet); -void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid); -int tsdbRemoveDFileSet(SDFileSet *pSet); +int tsdbOpenFS(STsdbRepo* pRepo); +void tsdbCloseFS(STsdbRepo* pRepo); +int tsdbFSNewTxn(STsdbRepo* pRepo); +int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError); +int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile); +int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); +void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid); +int tsdbRemoveDFileSet(SDFileSet* pSet); +int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); +void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); +SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to); static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); @@ -401,6 +404,31 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { return 0; } +// ================= tsdbStore.c +#define KVSTORE_FILE_VERSION ((uint32_t)0) + +typedef int (*iterFunc)(void*, void* cont, int contLen); +typedef void (*afterFunc)(void*); + +typedef struct { + SMFile f; + SHashObj* map; + iterFunc iFunc; + afterFunc aFunc; + void* appH; +} SKVStore; + +#define KVSTORE_MAGIC(s) (s)->f.info.magic + +int tdCreateKVStore(char* fname); +int tdDestroyKVStore(char* fname); +SKVStore* tdOpenKVStore(char* fname, iterFunc iFunc, afterFunc aFunc, void* appH); +void tdCloseKVStore(SKVStore* pStore); +int tdKVStoreStartCommit(SKVStore* pStore); +int tdUpdateKVStoreRecord(SKVStore* pStore, uint64_t uid, void* cont, int contLen); +int tdDropKVStoreRecord(SKVStore* pStore, uint64_t uid); +int tdKVStoreEndCommit(SKVStore* pStore); +void tsdbGetStoreInfo(char* fname, uint32_t* magic, int64_t* size); // ================= tsdbFile.c // extern const char* tsdbFileSuffix[]; @@ -467,7 +495,7 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { // } SFileGroupIter; // #define TSDB_FILE_NAME(pFile) ((pFile)->file.aname) -// #define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) +#define TSDB_KEY_FILEID(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) // #define TSDB_MAX_FILE(keep, daysPerFile) ((keep) / (daysPerFile) + 3) // #define TSDB_MIN_FILE_ID(fh) (fh)->pFGroup[0].fileId // #define TSDB_MAX_FILE_ID(fh) (fh)->pFGroup[(fh)->nFGroups - 1].fileId @@ -496,7 +524,7 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { // int tsdbLoadFileHeader(SFile* pFile, uint32_t* version); // void tsdbGetFileInfoImpl(char* fname, uint32_t* magic, int64_t* size); // void tsdbGetFidGroup(STsdbCfg* pCfg, SFidGroup* pFidGroup); -// void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup); // ================= tsdbMain.c diff --git a/src/tsdb/src/tsdbCommit.c b/src/tsdb/src/tsdbCommit.c index fe60cbf40d..9fc2bbc451 100644 --- a/src/tsdb/src/tsdbCommit.c +++ b/src/tsdb/src/tsdbCommit.c @@ -15,7 +15,14 @@ #include "tsdbMain.h" typedef struct { - SFidGroup fidg; + int minFid; + int midFid; + int maxFid; + TSKEY minKey; +} SRtn; + +typedef struct { + SRtn rtn; SCommitIter *iters; SRWHelper whelper; SDataCols * pDataCols; @@ -32,6 +39,8 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key); static int tsdbInitCommitH(STsdbRepo *pRepo, SCommitH *pch); static void tsdbDestroyCommitH(SCommitH *pch, int niter); +static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); +static int tsdbGetFidLevel(int fid, SRtn *pRtn); void *tsdbCommitData(STsdbRepo *pRepo) { if (tsdbStartCommit(pRepo) < 0) { @@ -64,45 +73,28 @@ _err: static int tsdbCommitTSData(STsdbRepo *pRepo) { SMemTable *pMem = pRepo->imem; - SCommitH ch = {0}; STsdbCfg * pCfg = &(pRepo->config); - // SFidGroup fidGroup = {0}; - TSKEY minKey = 0; - TSKEY maxKey = 0; + SCommitH ch = {0}; if (pMem->numOfRows <= 0) return 0; - tsdbGetFidGroup(pCfg, &(ch.fidg)); - tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, ch.fidg.minFid, &minKey, &maxKey); - tsdbRemoveFilesBeyondRetention(pRepo, &(ch.fidg)); - if (tsdbInitCommitH(pRepo, &ch) < 0) { - goto _err; + return -1; } - int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision)); - int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision)); + // TODO + int sfid = MIN(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); + int efid = MAX(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision), 1 /*TODO*/); - tsdbSeekCommitIter(ch.iters, pMem->maxTables, minKey); - - // Loop to commit to each file for (int fid = sfid; fid <= efid; fid++) { - if (fid < ch.fidg.minFid) continue; - - if (tsdbCommitToFile(pRepo, fid, &(ch)) < 0) { - tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; + if (tsdbCommitToFile(pRepo, fid, &ch) < 0) { + tsdbDestroyCommitH(&ch, pMem->maxTables); + return -1; } } - tsdbApplyRetention(pRepo, &(ch.fidg)); - tsdbDestroyCommitH(&ch, pMem->maxTables); return 0; - -_err: - tsdbDestroyCommitH(&ch, pMem->maxTables); - return -1; } static int tsdbCommitMeta(STsdbRepo *pRepo) { @@ -148,7 +140,7 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) { } // TODO: update meta file - tsdbUpdateMFile(pRepo, NULL); + tsdbUpdateMFile(pRepo, &(pMeta->pStore.f)); return 0; @@ -195,116 +187,56 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitH *pch) { - STsdbCfg * pCfg = &pRepo->config; - STsdbFileH * pFileH = pRepo->tsdbFileH; - SFileGroup * pGroup = NULL; - SMemTable * pMem = pRepo->imem; - bool newLast = false; - TSKEY minKey = 0; - TSKEY maxKey = 0; - SCommitIter *iters = pch->iters; - SRWHelper * pHelper = &(pch->whelper); - SDataCols * pDataCols = pch->pDataCols; + STsdbCfg * pCfg = &(pRepo->config); + SMemTable *pMem = pRepo->imem; + TSKEY minKey, maxKey; + SDFileSet *pOldSet = NULL; + SDFileSet newSet = {0}; tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - // Check if there are data to commit to this file - if (!tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey)) { - tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid); - return 0; - } - - if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) { - pGroup = tsdbCreateFGroup(pRepo, fid, tsdbGetFidLevel(fid, pch->fidg)); - if (pGroup == NULL) { - tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - return -1; + if (pOldSet) { // file exists + int level = tsdbGetFidLevel(fid, &(pch->rtn)); + if (level < 0) { // if out of data, remove it and ignore expired memory data + tsdbRemoveExpiredDFileSet(pRepo, fid); + tsdbSeekCommitIter(pch->iters, pMem->maxTables, maxKey + 1); + return 0; } + + // Move the data file set to correct level + tsdbMoveDFileSet(pOldSet, level); } - // Open files for write/read - if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) { - tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + if (tsdbHasDataToCommit(pch->iters, pMem->maxTables, minKey, maxKey)) { + if (tsdbSetAndOpenHelperFile(&(pch->whelper), pOldSet, &newSet) < 0) return -1; - newLast = TSDB_NLAST_FILE_OPENED(pHelper); + if (tsdbLoadCompIdx(&pch->whelper, NULL) < 0) return -1; - if (tsdbLoadCompIdx(pHelper, NULL) < 0) { - tsdbError("vgId:%d failed to load SBlockIdx part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + for (int tid = 0; tid < pMem->maxTables; tid++) { + SCommitIter *pIter = pch->iters + tid; + if (pIter->pTable == NULL) continue; - // Loop to commit data in each table - for (int tid = 1; tid < pMem->maxTables; tid++) { - SCommitIter *pIter = iters + tid; - if (pIter->pTable == NULL) continue; + if (tsdbSetHelperTable(&(pch->whelper), pIter->pTable, pRepo) < 0) return -1; - TSDB_RLOCK_TABLE(pIter->pTable); + TSDB_RLOCK_TABLE(pIter->pTable); - if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err; - - if (pIter->pIter != NULL) { - if (tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1)) < 0) { - terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; - goto _err; + if (pIter->pIter != NULL) { // has data in memory to commit } - if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) { - TSDB_RUNLOCK_TABLE(pIter->pTable); - tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo), - TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable), - tstrerror(terrno)); - goto _err; - } + TSDB_RUNLOCK_TABLE(pIter->pTable); + if (tsdbMoveLastBlockIfNeccessary() < 0) return -1; + + if (tsdbWriteCompInfo() < 0) return -1; } - TSDB_RUNLOCK_TABLE(pIter->pTable); - - // Move the last block to the new .l file if neccessary - if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) { - tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } - - // Write the SBlock part - if (tsdbWriteCompInfo(pHelper) < 0) { - tsdbError("vgId:%d, failed to write compInfo part since %s", REPO_ID(pRepo), tstrerror(terrno)); - goto _err; - } + if (tsdbWriteCompIdx() < 0) return -1; } - if (tsdbWriteCompIdx(pHelper) < 0) { - tsdbError("vgId:%d failed to write compIdx part to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno)); - goto _err; + if (/*file exists OR has data to commit*/) { + tsdbUpdateDFileSet(pRepo, &newSet); } - tsdbCloseHelperFile(pHelper, 0, pGroup); - - pthread_rwlock_wrlock(&(pFileH->fhlock)); - - // tfsremove(&(helperHeadF(pHelper)->file)); - (void)rename(TSDB_FILE_NAME(helperNewHeadF(pHelper)), TSDB_FILE_NAME(helperHeadF(pHelper))); - tfsDecDiskFile(helperNewHeadF(pHelper)->file.level, helperNewHeadF(pHelper)->file.id, 1); - pGroup->files[TSDB_FILE_TYPE_HEAD].info = helperNewHeadF(pHelper)->info; - - if (newLast) { - (void)rename(TSDB_FILE_NAME(helperNewLastF(pHelper)), TSDB_FILE_NAME(helperLastF(pHelper))); - tfsDecDiskFile(helperNewLastF(pHelper)->file.level, helperNewLastF(pHelper)->file.id, 1); - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperNewLastF(pHelper)->info; - } else { - pGroup->files[TSDB_FILE_TYPE_LAST].info = helperLastF(pHelper)->info; - } - - pGroup->files[TSDB_FILE_TYPE_DATA].info = helperDataF(pHelper)->info; - - pthread_rwlock_unlock(&(pFileH->fhlock)); - return 0; - -_err: - tsdbCloseHelperFile(pHelper, 1, pGroup); - return -1; } static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) { @@ -399,4 +331,31 @@ static void tsdbDestroyCommitH(SCommitH *pch, int niter) { tdFreeDataCols(pch->pDataCols); tsdbDestroyCommitIters(pch->iters, niter); tsdbDestroyHelper(&(pch->whelper)); +} + +static void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { + STsdbCfg *pCfg = &(pRepo->config); + TSKEY minKey, midKey, maxKey, now; + + now = taosGetTimestamp(pCfg->precision); + minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision]; + midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision]; + maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision]; + + pRtn->minKey = minKey; + pRtn->minFid = TSDB_KEY_FILEID(minKey, pCfg->daysPerFile, pCfg->precision); + pRtn->midFid = TSDB_KEY_FILEID(midKey, pCfg->daysPerFile, pCfg->precision); + pRtn->maxFid = TSDB_KEY_FILEID(maxKey, pCfg->daysPerFile, pCfg->precision); +} + +static int tsdbGetFidLevel(int fid, SRtn *pRtn) { + if (fid >= pRtn->maxFid) { + return 0; + } else if (fid >= pRtn->midFid) { + return 1; + } else if (fid >= pRtn->minFid) { + return 2; + } else { + return -1; + } } \ No newline at end of file diff --git a/src/tsdb/src/tsdbFS.c b/src/tsdb/src/tsdbFS.c index 86204f40f9..8397410e50 100644 --- a/src/tsdb/src/tsdbFS.c +++ b/src/tsdb/src/tsdbFS.c @@ -98,7 +98,7 @@ int tsdbUpdateDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { return -1; } } else { - int index = TARRAY_ELEM_IDX(dfArray, ptr); + int index = TARRAY_ELEM_IDX(pSnapshot->df, pOldSet); if (pOldSet->id == pSet->id) { taosArraySet(pSnapshot->df, index, pSet); @@ -125,6 +125,32 @@ void tsdbRemoveExpiredDFileSet(STsdbRepo *pRepo, int mfid) { } } +int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { + int tlen = 0; + + tlen += taosEncodeVariantI64(buf, pInfo->size); + tlen += taosEncodeVariantI64(buf, pInfo->tombSize); + tlen += taosEncodeVariantI64(buf, pInfo->nRecords); + tlen += taosEncodeVariantI64(buf, pInfo->nDels); + tlen += taosEncodeFixedU32(buf, pInfo->magic); + + return tlen; +} + +void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { + buf = taosDecodeVariantI64(buf, &(pInfo->size)); + buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); + buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); + buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); + buf = taosDecodeFixedU32(buf, &(pInfo->magic)); + + return buf; +} + +SDFileSet tsdbMoveDFileSet(SDFileSet *pOldSet, int to) { + // TODO +} + static int tsdbSaveFSSnapshot(int fd, SFSSnapshot *pSnapshot) { // TODO return 0; @@ -152,28 +178,6 @@ static int tsdbOpenFSImpl(STsdbRepo *pRepo) { return 0; } -static int tsdbEncodeMFInfo(void **buf, SMFInfo *pInfo) { - int tlen = 0; - - tlen += taosEncodeVariantI64(buf, pInfo->size); - tlen += taosEncodeVariantI64(buf, pInfo->tombSize); - tlen += taosEncodeVariantI64(buf, pInfo->nRecords); - tlen += taosEncodeVariantI64(buf, pInfo->nDels); - tlen += taosEncodeFixedU32(buf, pInfo->magic); - - return tlen; -} - -static void *tsdbDecodeMFInfo(void *buf, SMFInfo *pInfo) { - buf = taosDecodeVariantI64(buf, &(pInfo->size)); - buf = taosDecodeVariantI64(buf, &(pInfo->tombSize)); - buf = taosDecodeVariantI64(buf, &(pInfo->nRecords)); - buf = taosDecodeVariantI64(buf, &(pInfo->nDels)); - buf = taosDecodeFixedU32(buf, &(pInfo->magic)); - - return buf; -} - static int tsdbEncodeMFile(void **buf, SMFile *pMFile) { int tlen = 0; diff --git a/src/util/src/tkvstore.c b/src/tsdb/src/tsdbStore.c similarity index 97% rename from src/util/src/tkvstore.c rename to src/tsdb/src/tsdbStore.c index 1ed0c0014b..1420888648 100644 --- a/src/util/src/tkvstore.c +++ b/src/tsdb/src/tsdbStore.c @@ -15,20 +15,8 @@ #define _DEFAULT_SOURCE #define TAOS_RANDOM_FILE_FAIL_TEST -#include "os.h" -#include "hash.h" -#include "taoserror.h" -#include "tchecksum.h" -#include "tcoding.h" -#include "tkvstore.h" -#include "tulog.h" -#define TD_KVSTORE_HEADER_SIZE 512 -#define TD_KVSTORE_MAJOR_VERSION 1 -#define TD_KVSTORE_MAINOR_VERSION 0 -#define TD_KVSTORE_SNAP_SUFFIX ".snap" -#define TD_KVSTORE_NEW_SUFFIX ".new" -#define TD_KVSTORE_INIT_MAGIC 0xFFFFFFFF +#include "tsdbMain.h" typedef struct { uint64_t uid; diff --git a/src/util/inc/tkvstore.h b/src/util/inc/tkvstore.h deleted file mode 100644 index ca74e3435a..0000000000 --- a/src/util/inc/tkvstore.h +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef _TD_KVSTORE_H_ -#define _TD_KVSTORE_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -#include - -#define KVSTORE_FILE_VERSION ((uint32_t)0) - -typedef int (*iterFunc)(void *, void *cont, int contLen); -typedef void (*afterFunc)(void *); - -typedef struct { - int64_t size; // including 512 bytes of header size - int64_t tombSize; - int64_t nRecords; - int64_t nDels; - uint32_t magic; -} SStoreInfo; - -typedef struct { - char * fname; - int fd; - SHashObj * map; - iterFunc iFunc; - afterFunc aFunc; - void * appH; - SStoreInfo info; -} SKVStore; - -#define KVSTORE_MAGIC(s) (s)->info.magic - -int tdCreateKVStore(char *fname); -int tdDestroyKVStore(char *fname); -SKVStore *tdOpenKVStore(char *fname, iterFunc iFunc, afterFunc aFunc, void *appH); -void tdCloseKVStore(SKVStore *pStore); -int tdKVStoreStartCommit(SKVStore *pStore); -int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLen); -int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid); -int tdKVStoreEndCommit(SKVStore *pStore); -void tsdbGetStoreInfo(char *fname, uint32_t *magic, int64_t *size); - -#ifdef __cplusplus -} -#endif - -#endif \ No newline at end of file