diff --git a/include/dnode/vnode/tsdb2/tsdb.h b/include/dnode/vnode/tsdb2/tsdb.h index d3228bcade..49840ae231 100644 --- a/include/dnode/vnode/tsdb2/tsdb.h +++ b/include/dnode/vnode/tsdb2/tsdb.h @@ -58,15 +58,6 @@ typedef struct SDataStatis { } SDataStatis; // --------- TSDB APPLICATION HANDLE DEFINITION -typedef struct { - void *appH; - void *cqH; - int (*notifyStatus)(void *, int status, int eno); - int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, - int start); - void (*cqDropFunc)(void *handle); -} STsdbAppH; // --------- TSDB REPOSITORY CONFIGURATION DEFINITION typedef struct { diff --git a/source/dnode/vnode/tsdb2/inc/tsdbint.h b/source/dnode/vnode/tsdb2/inc/tsdbint.h index 8e02412fb6..0f492d90c3 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbint.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbint.h @@ -16,22 +16,21 @@ #ifndef _TD_TSDB_INT_H_ #define _TD_TSDB_INT_H_ - #include "os.h" -#include "tlog.h" #include "taosdef.h" #include "taoserror.h" +#include "tarray.h" #include "tchecksum.h" -#include "tskiplist.h" -#include "tdataformat.h" #include "tcoding.h" #include "tcompression.h" -#include "tlockfree.h" -#include "tlist.h" -#include "thash.h" -#include "tarray.h" +#include "tdataformat.h" #include "tfs.h" +#include "thash.h" +#include "tlist.h" +#include "tlockfree.h" +#include "tlog.h" #include "tsdbMemory.h" +#include "tskiplist.h" #include "tsdb.h" @@ -61,32 +60,15 @@ extern "C" { #include "tsdbRowMergeBuf.h" // Main definitions struct STsdb { - uint8_t state; - - STsdbCfg config; - - STsdbCfg save_config; // save apply config - bool config_changed; // config changed flag - pthread_mutex_t save_mutex; // protect save config - - int16_t cacheLastConfigVersion; - - STsdbAppH appH; - STsdbStat stat; - STsdbMeta* tsdbMeta; - // STsdbBufPool* pPool; - SMemTable* mem; - SMemTable* imem; - STsdbFS* fs; - SRtn rtn; - tsem_t readyToCommit; - pthread_mutex_t mutex; - bool repoLocked; - int32_t code; // Commit code - - SMergeBuf mergeBuf; //used when update=2 - int8_t compactState; // compact state: inCompact/noCompact/waitingCompact? - pthread_t* pthread; + uint8_t state; + STsdbCfg config; + STsdbStat stat; + STsdbMeta* tsdbMeta; + SMemTable* mem; + SMemTable* imem; + STsdbFS* fs; + SRtn rtn; + SMergeBuf mergeBuf; // used when update=2 }; #define REPO_ID(r) (r)->config.tsdbId @@ -95,40 +77,15 @@ struct STsdb { #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) -int tsdbLockRepo(STsdb* pRepo); -int tsdbUnlockRepo(STsdb* pRepo); -STsdbMeta* tsdbGetMeta(STsdb* pRepo); -int tsdbCheckCommit(STsdb* pRepo); -int tsdbRestoreInfo(STsdb* pRepo); -UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg); -int32_t tsdbLoadLastCache(STsdb *pRepo, STable* pTable); -void tsdbGetRootDir(int repoid, char dirName[]); -void tsdbGetDataDir(int repoid, char dirName[]); - -// static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) { -// ASSERT(pRepo != NULL); -// if (pRepo->mem == NULL) return NULL; - -// SListNode* pNode = listTail(pRepo->mem->bufBlockList); -// if (pNode == NULL) return NULL; - -// STsdbBufBlock* pBufBlock = NULL; -// tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock)); - -// return pBufBlock; -// } - -// static FORCE_INLINE int tsdbGetNextMaxTables(int tid) { -// ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES); -// int maxTables = TSDB_INIT_NTABLES; -// while (true) { -// maxTables = MIN(maxTables, TSDB_MAX_TABLES); -// if (tid <= maxTables) break; -// maxTables *= 2; -// } - -// return maxTables + 1; -// } +int tsdbLockRepo(STsdb* pRepo); +int tsdbUnlockRepo(STsdb* pRepo); +STsdbMeta* tsdbGetMeta(STsdb* pRepo); +int tsdbCheckCommit(STsdb* pRepo); +int tsdbRestoreInfo(STsdb* pRepo); +UNUSED_FUNC int tsdbCacheLastData(STsdb* pRepo, STsdbCfg* oldCfg); +int32_t tsdbLoadLastCache(STsdb* pRepo, STable* pTable); +void tsdbGetRootDir(int repoid, char dirName[]); +void tsdbGetDataDir(int repoid, char dirName[]); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb2/src/tsdbMain.c b/source/dnode/vnode/tsdb2/src/tsdbMain.c index 661d6b2e3e..bc5357f6af 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMain.c @@ -16,56 +16,25 @@ // no test file errors here #include "taosdef.h" #include "tsdbint.h" -#include "ttimer.h" #include "tthread.h" +#include "ttimer.h" #define IS_VALID_PRECISION(precision) \ (((precision) >= TSDB_TIME_PRECISION_MILLI) && ((precision) <= TSDB_TIME_PRECISION_NANO)) #define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) -static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); -static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); -static void tsdbFreeRepo(STsdb *pRepo); -static void tsdbStartStream(STsdb *pRepo); -static void tsdbStopStream(STsdb *pRepo); -static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh); -static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); +static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); +static STsdb * tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); +static void tsdbFreeRepo(STsdb *pRepo); +static void tsdbStartStream(STsdb *pRepo); +static void tsdbStopStream(STsdb *pRepo); +static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH *pReadh); +static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH *pReadh, SBlockIdx *pIdx); -// // Function declaration -// int32_t tsdbCreateRepo(int repoid) { -// char tsdbDir[TSDB_FILENAME_LEN] = "\0"; -// char dataDir[TSDB_FILENAME_LEN] = "\0"; - -// tsdbGetRootDir(repoid, tsdbDir); -// if (tfsMkdir(tsdbDir) < 0) { -// goto _err; -// } - -// tsdbGetDataDir(repoid, dataDir); -// if (tfsMkdir(dataDir) < 0) { -// goto _err; -// } - -// // TODO: need to create current file with nothing in - -// return 0; - -// _err: -// tsdbError("vgId:%d failed to create TSDB repository since %s", repoid, tstrerror(terrno)); -// return -1; -// } - -// int32_t tsdbDropRepo(int repoid) { -// char tsdbDir[TSDB_FILENAME_LEN] = "\0"; - -// tsdbGetRootDir(repoid, tsdbDir); -// return tfsRmdir(tsdbDir); -// } - -STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) { - STsdb *pRepo; - STsdbCfg config = *pCfg; +STsdb *tsdbOpen(const char *path, STsdbCfg *pCfg) { + STsdb * pTsdb; + STsdbCfg config = *pCfg; terrno = TSDB_CODE_SUCCESS; @@ -76,39 +45,39 @@ STsdb *tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH) { } // Create new TSDB object - if ((pRepo = tsdbNewRepo(&config, pAppH)) == NULL) { + if ((pTsdb = tsdbNewRepo(&config, pAppH)) == NULL) { tsdbError("vgId:%d failed to open TSDB repository while creating TSDB object since %s", config.tsdbId, tstrerror(terrno)); return NULL; } // Open meta - if (tsdbOpenMeta(pRepo) < 0) { + if (tsdbOpenMeta(pTsdb) < 0) { tsdbError("vgId:%d failed to open TSDB repository while opening Meta since %s", config.tsdbId, tstrerror(terrno)); - tsdbClose(pRepo, false); + tsdbClose(pTsdb, false); return NULL; } - if (tsdbOpenFS(pRepo) < 0) { + if (tsdbOpenFS(pTsdb) < 0) { tsdbError("vgId:%d failed to open TSDB repository while opening FS since %s", config.tsdbId, tstrerror(terrno)); - tsdbClose(pRepo, false); + tsdbClose(pTsdb, false); return NULL; } // TODO: Restore information from data - if ((!(pRepo->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pRepo) < 0) { + if ((!(pTsdb->state & TSDB_STATE_BAD_DATA)) && tsdbRestoreInfo(pTsdb) < 0) { tsdbError("vgId:%d failed to open TSDB repository while restore info since %s", config.tsdbId, tstrerror(terrno)); - tsdbClose(pRepo, false); + tsdbClose(pTsdb, false); return NULL; } - pRepo->mergeBuf = NULL; + pTsdb->mergeBuf = NULL; - tsdbStartStream(pRepo); + tsdbStartStream(pTsdb); - tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pRepo)); + tsdbDebug("vgId:%d, TSDB repository opened", REPO_ID(pTsdb)); - return pRepo; + return pTsdb; } // Note: all working thread and query thread must stopped when calling this function @@ -116,12 +85,12 @@ int tsdbClose(STsdb *repo, int toCommit) { if (repo == NULL) return 0; STsdb *pRepo = repo; - int vgId = REPO_ID(pRepo); + int vgId = REPO_ID(pRepo); terrno = TSDB_CODE_SUCCESS; tsdbStopStream(pRepo); - if(pRepo->pthread){ + if (pRepo->pthread) { taosDestoryThread(pRepo->pthread); pRepo->pthread = NULL; } @@ -192,7 +161,8 @@ int tsdbUnlockRepo(STsdb *pRepo) { // STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo); // ASSERT(pBufBlock != NULL); // if ((pRepo->mem->extraBuffList != NULL) || -// ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) { +// ((listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) && (pBufBlock->remain < TSDB_BUFFER_RESERVE))) +// { // // trigger commit // if (tsdbAsyncCommit(pRepo) < 0) return -1; // } @@ -218,9 +188,9 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; - - STsdbCfg * pRCfg = &repo->config; - + + STsdbCfg *pRCfg = &repo->config; + ASSERT(pRCfg->tsdbId == pCfg->tsdbId); ASSERT(pRCfg->cacheBlockSize == pCfg->cacheBlockSize); ASSERT(pRCfg->daysPerFile == pCfg->daysPerFile); @@ -259,7 +229,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { return -1; } - STsdbCfg * pSaveCfg = &repo->save_config; + STsdbCfg *pSaveCfg = &repo->save_config; *pSaveCfg = repo->config; pSaveCfg->compression = pCfg->compression; @@ -269,14 +239,11 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { pSaveCfg->cacheLastRow = pCfg->cacheLastRow; pSaveCfg->totalBlocks = pCfg->totalBlocks; - tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", - REPO_ID(repo), - pRCfg->compression, pRCfg->keep, pRCfg->keep1,pRCfg->keep2, - pRCfg->cacheLastRow, pRCfg->totalBlocks); - tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", - REPO_ID(repo), - pSaveCfg->compression, pSaveCfg->keep,pSaveCfg->keep1, pSaveCfg->keep2, - pSaveCfg->cacheLastRow,pSaveCfg->totalBlocks); + tsdbInfo("vgId:%d old config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), + pRCfg->compression, pRCfg->keep, pRCfg->keep1, pRCfg->keep2, pRCfg->cacheLastRow, pRCfg->totalBlocks); + tsdbInfo("vgId:%d new config: compression(%d), keep(%d,%d,%d), cacheLastRow(%d),totalBlocks(%d)", REPO_ID(repo), + pSaveCfg->compression, pSaveCfg->keep, pSaveCfg->keep1, pSaveCfg->keep2, pSaveCfg->cacheLastRow, + pSaveCfg->totalBlocks); repo->config_changed = true; @@ -335,91 +302,7 @@ int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { #endif } -uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { - // TODO - return 0; -#if 0 - STsdbRepo *pRepo = (STsdbRepo *)repo; - // STsdbMeta *pMeta = pRepo->tsdbMeta; - STsdbFileH *pFileH = pRepo->tsdbFileH; - uint32_t magic = 0; - char * fname = NULL; - - struct stat fState; - - tsdbDebug("vgId:%d name:%s index:%d eindex:%d", pRepo->config.tsdbId, name, *index, eindex); - ASSERT(*index <= eindex); - - if (name[0] == 0) { // get the file from index or after, but not larger than eindex - int fid = (*index) / TSDB_FILE_TYPE_MAX; - - if (pFileH->nFGroups == 0 || fid > pFileH->pFGroup[pFileH->nFGroups - 1].fileId) { - if (*index <= TSDB_META_FILE_INDEX && TSDB_META_FILE_INDEX <= eindex) { - fname = tsdbGetMetaFileName(pRepo->rootDir); - *index = TSDB_META_FILE_INDEX; - magic = TSDB_META_FILE_MAGIC(pRepo->tsdbMeta); - sprintf(name, "tsdb/%s", TSDB_META_FILE_NAME); - } else { - return 0; - } - } else { - SFileGroup *pFGroup = - taosbsearch(&fid, pFileH->pFGroup, pFileH->nFGroups, sizeof(SFileGroup), keyFGroupCompFunc, TD_GE); - if (pFGroup->fileId == fid) { - SFile *pFile = &pFGroup->files[(*index) % TSDB_FILE_TYPE_MAX]; - fname = strdup(TSDB_FILE_NAME(pFile)); - magic = pFile->info.magic; - char *tfname = strdup(fname); - sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname)); - tfree(tfname); - } else { - if ((pFGroup->fileId + 1) * TSDB_FILE_TYPE_MAX - 1 < (int)eindex) { - SFile *pFile = &pFGroup->files[0]; - fname = strdup(TSDB_FILE_NAME(pFile)); - *index = pFGroup->fileId * TSDB_FILE_TYPE_MAX; - magic = pFile->info.magic; - char *tfname = strdup(fname); - sprintf(name, "tsdb/%s/%s", TSDB_DATA_DIR_NAME, basename(tfname)); - tfree(tfname); - } else { - return 0; - } - } - } - } else { // get the named file at the specified index. If not there, return 0 - fname = malloc(256); - sprintf(fname, "%s/vnode/vnode%d/%s", TFS_PRIMARY_PATH(), REPO_ID(pRepo), name); - if (access(fname, F_OK) != 0) { - tfree(fname); - return 0; - } - if (*index == TSDB_META_FILE_INDEX) { // get meta file - tsdbGetStoreInfo(fname, &magic, size); - } else { - char tfname[TSDB_FILENAME_LEN] = "\0"; - sprintf(tfname, "vnode/vnode%d/tsdb/%s/%s", REPO_ID(pRepo), TSDB_DATA_DIR_NAME, basename(name)); - tsdbGetFileInfoImpl(tfname, &magic, size); - } - tfree(fname); - return magic; - } - - if (stat(fname, &fState) < 0) { - tfree(fname); - return 0; - } - - *size = fState.st_size; - // magic = *size; - - tfree(fname); - return magic; -#endif -} - -void tsdbGetRootDir(int repoid, char dirName[]) { - snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid); -} +void tsdbGetRootDir(int repoid, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb", repoid); } void tsdbGetDataDir(int repoid, char dirName[]) { snprintf(dirName, TSDB_FILENAME_LEN, "vnode/vnode%d/tsdb/data", repoid); @@ -550,8 +433,7 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { // update cacheLastRow if (pCfg->cacheLastRow != 0) { - if (pCfg->cacheLastRow > 3) - pCfg->cacheLastRow = 1; + if (pCfg->cacheLastRow > 3) pCfg->cacheLastRow = 1; } return 0; } @@ -633,8 +515,9 @@ static void tsdbStartStream(STsdb *pRepo) { for (int i = 0; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable && pTable->type == TSDB_STREAM_TABLE) { - pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, pTable->sql, - tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0); + pTable->cqhandle = + (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), TABLE_NAME(pTable)->data, + pTable->sql, tsdbGetTableSchemaImpl(pTable, false, false, -1, -1), 0); } } } @@ -650,8 +533,8 @@ static void tsdbStopStream(STsdb *pRepo) { } } -static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) { - //tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); +static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH *pReadh) { + // tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); STSchema *pSchema = tsdbGetTableLatestSchema(pTable); if (pSchema == NULL) { @@ -659,10 +542,10 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) return 0; } - SBlock* pBlock; - int numColumns; - int32_t blockIdx; - SDataStatis* pBlockStatis = NULL; + SBlock * pBlock; + int numColumns; + int32_t blockIdx; + SDataStatis *pBlockStatis = NULL; // SMemRow row = NULL; // restore last column data with last schema @@ -702,7 +585,7 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) goto out; } memset(pBlockStatis, 0, numColumns * sizeof(SDataStatis)); - for(int32_t i = 0; i < numColumns; ++i) { + for (int32_t i = 0; i < numColumns; ++i) { STColumn *pCol = schemaColAt(pSchema, i); pBlockStatis[i].colId = pCol->colId; } @@ -744,8 +627,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) // OK,let's load row from backward to get not-null column for (int32_t rowId = pBlock->numOfRows - 1; rowId >= 0; rowId--) { - SDataCol *pDataCol = pReadh->pDCols[0]->cols + i; - const void* pColData = tdGetColDataOfRow(pDataCol, rowId); + SDataCol * pDataCol = pReadh->pDCols[0]->cols + i; + const void *pColData = tdGetColDataOfRow(pDataCol, rowId); // tdAppendColVal(memRowDataBody(row), pColData, pCol->type, pCol->offset); // SDataCol *pDataCol = readh.pDCols[0]->cols + j; // void *value = tdGetRowDataOfCol(memRowDataBody(row), (int8_t)pCol->type, TD_DATA_ROW_HEAD_SIZE + @@ -757,11 +640,12 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) int16_t idx = tsdbGetLastColumnsIndexByColId(pTable, pCol->colId); if (idx == -1) { - tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), pTable->name->data, pCol->colId); + tsdbError("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d fail", REPO_ID(pRepo), + pTable->name->data, pCol->colId); continue; } // save not-null column - uint16_t bytes = IS_VAR_DATA_TYPE(pCol->type) ? varDataTLen(pColData) : pCol->bytes; + uint16_t bytes = IS_VAR_DATA_TYPE(pCol->type) ? varDataTLen(pColData) : pCol->bytes; SDataCol *pLastCol = &(pTable->lastCols[idx]); pLastCol->pData = malloc(bytes); pLastCol->bytes = bytes; @@ -777,7 +661,8 @@ static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) pTable->restoreColumnNum += 1; - tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), pTable->name->data, pLastCol->colId, pLastCol->ts); + tsdbDebug("tsdbRestoreLastColumns restore vgId:%d,table:%s cache column %d, %" PRId64, REPO_ID(pRepo), + pTable->name->data, pLastCol->colId, pLastCol->ts); break; } } @@ -795,20 +680,20 @@ out: return err; } -static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { +static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH *pReadh, SBlockIdx *pIdx) { ASSERT(pTable->lastRow == NULL); if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { return -1; } - SBlock* pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; + SBlock *pBlock = pReadh->pBlkInfo->blocks + pIdx->numOfBlocks - 1; if (tsdbLoadBlockData(pReadh, pBlock, NULL) < 0) { return -1; } // Get the data in row - + STSchema *pSchema = tsdbGetTableSchema(pTable); SMemRow lastRow = taosTMalloc(memRowMaxBytesFromSchema(pSchema)); if (lastRow == NULL) { @@ -825,7 +710,7 @@ static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlo } TSKEY lastKey = memRowKey(lastRow); - + // during the load data in file, new data would be inserted and last row has been updated TSDB_WLOCK_TABLE(pTable); if (pTable->lastRow == NULL) { @@ -857,7 +742,7 @@ int tsdbRestoreInfo(STsdb *pRepo) { for (int i = 1; i < pMeta->maxTables; i++) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - pTable->restoreColumnNum = 0; + pTable->restoreColumnNum = 0; pTable->hasRestoreLastColumn = false; } } @@ -877,7 +762,7 @@ int tsdbRestoreInfo(STsdb *pRepo) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + // tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); if (tsdbSetReadTable(&readh, pTable) < 0) { tsdbDestroyReadH(&readh); @@ -894,7 +779,7 @@ int tsdbRestoreInfo(STsdb *pRepo) { return -1; } } - + // restore NULL columns if (pIdx && CACHE_LAST_NULL_COLUMN(pCfg) && !pTable->hasRestoreLastColumn) { if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { @@ -918,13 +803,14 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; - int cacheLastRowTableNum = 0; - int cacheLastColTableNum = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; bool cacheLastRow = CACHE_LAST_ROW(&(pRepo->config)); bool cacheLastCol = CACHE_LAST_NULL_COLUMN(&(pRepo->config)); - tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, cacheLastCol); + tsdbDebug("tsdbLoadLastCache for %s, cacheLastRow:%d, cacheLastCol:%d", pTable->name->data, cacheLastRow, + cacheLastCol); pTable->cacheLastConfigVersion = pRepo->cacheLastConfigVersion; @@ -940,7 +826,7 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { return 0; } - cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0; + cacheLastRowTableNum = (cacheLastRow && pTable->lastRow == NULL) ? 1 : 0; cacheLastColTableNum = (cacheLastCol && pTable->lastCols == NULL) ? 1 : 0; if (cacheLastRowTableNum == 0 && cacheLastColTableNum == 0) { @@ -1005,21 +891,21 @@ int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { return 0; } -UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { - bool cacheLastRow = false, cacheLastCol = false; +UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg *oldCfg) { + bool cacheLastRow = false, cacheLastCol = false; SFSIter fsiter; SReadH readh; SDFileSet *pSet; STsdbMeta *pMeta = pRepo->tsdbMeta; - int tableNum = 0; - int maxTableIdx = 0; - int cacheLastRowTableNum = 0; - int cacheLastColTableNum = 0; + int tableNum = 0; + int maxTableIdx = 0; + int cacheLastRowTableNum = 0; + int cacheLastColTableNum = 0; bool need_free_last_row = CACHE_LAST_ROW(oldCfg) && !CACHE_LAST_ROW(&(pRepo->config)); bool need_free_last_col = CACHE_LAST_NULL_COLUMN(oldCfg) && !CACHE_LAST_NULL_COLUMN(&(pRepo->config)); - if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { + if (CACHE_LAST_ROW(&(pRepo->config)) || CACHE_LAST_NULL_COLUMN(&(pRepo->config))) { tsdbInfo("tsdbCacheLastData cache last data since cacheLast option changed"); cacheLastRow = !CACHE_LAST_ROW(oldCfg) && CACHE_LAST_ROW(&(pRepo->config)); cacheLastCol = !CACHE_LAST_NULL_COLUMN(oldCfg) && CACHE_LAST_NULL_COLUMN(&(pRepo->config)); @@ -1034,7 +920,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { if (cacheLastCol) { pTable->restoreColumnNum = 0; pTable->hasRestoreLastColumn = false; - } + } } // if close last option,need to free data @@ -1042,10 +928,10 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { // if (need_free_last_col) { // atomic_store_8(&pRepo->hasCachedLastColumn, 0); // } - tsdbInfo("free cache last data since cacheLast option changed"); + tsdbInfo("free cache last data since cacheLast option changed"); for (int i = 1; i <= maxTableIdx; i++) { STable *pTable = pMeta->tables[i]; - if (pTable == NULL) continue; + if (pTable == NULL) continue; if (need_free_last_row) { taosTZfree(pTable->lastRow); pTable->lastRow = NULL; @@ -1054,7 +940,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { tsdbFreeLastColumns(pTable); pTable->hasRestoreLastColumn = false; } - } + } } if (!cacheLastRow && !cacheLastCol) { @@ -1085,7 +971,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { STable *pTable = pMeta->tables[i]; if (pTable == NULL) continue; - //tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); + // tsdbInfo("tsdbRestoreInfo restore vgId:%d,table:%s", REPO_ID(pRepo), pTable->name->data); if (tsdbSetReadTable(&readh, pTable) < 0) { tsdbDestroyReadH(&readh); @@ -1094,7 +980,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { SBlockIdx *pIdx = readh.pBlkIdx; - if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { + if (pIdx && cacheLastRowTableNum > 0 && pTable->lastRow == NULL) { pTable->lastKey = pIdx->maxKey; if (tsdbRestoreLastRow(pRepo, pTable, &readh, pIdx) != 0) { @@ -1103,7 +989,7 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { } cacheLastRowTableNum -= 1; } - + // restore NULL columns if (pIdx && cacheLastColTableNum > 0 && !pTable->hasRestoreLastColumn) { if (tsdbRestoreLastColumns(pRepo, pTable, &readh) != 0) { @@ -1122,6 +1008,6 @@ UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { // if (cacheLastCol) { // atomic_store_8(&pRepo->hasCachedLastColumn, 1); // } - + return 0; }