diff --git a/include/dnode/vnode/tsdb2/tsdb.h b/include/dnode/vnode/tsdb2/tsdb.h index c2906ae4ca..7fd8ea4212 100644 --- a/include/dnode/vnode/tsdb2/tsdb.h +++ b/include/dnode/vnode/tsdb2/tsdb.h @@ -19,14 +19,15 @@ #include #include +#include "common.h" #include "taosdef.h" -#include "tmsg.h" #include "tarray.h" #include "tdataformat.h" -#include "tname.h" #include "thash.h" -#include "tlockfree.h" #include "tlist.h" +#include "tlockfree.h" +#include "tmsg.h" +#include "tname.h" #ifdef __cplusplus extern "C" { @@ -39,7 +40,7 @@ extern "C" { #define TSDB_STATUS_COMMIT_START 1 #define TSDB_STATUS_COMMIT_OVER 2 -#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved +#define TSDB_STATUS_COMMIT_NOBLOCK 3 // commit no block, need to be solved // TSDB STATE DEFINITION #define TSDB_STATE_OK 0x0 @@ -62,7 +63,8 @@ typedef struct { void *cqH; int (*notifyStatus)(void *, int status, int eno); int (*eventCallBack)(void *); - void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start); + void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, + int start); void (*cqDropFunc)(void *handle); } STsdbAppH; @@ -75,16 +77,17 @@ typedef struct { int32_t keep; // day of data to keep int32_t keep1; int32_t keep2; + int32_t lruCacheSize; int32_t minRowsPerFileBlock; // minimum rows per file block int32_t maxRowsPerFileBlock; // maximum rows per file block int8_t precision; int8_t compression; int8_t update; - int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 + int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 } STsdbCfg; -#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) -#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) +#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) +#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) #define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) // --------- TSDB REPOSITORY USAGE STATISTICS @@ -94,18 +97,18 @@ typedef struct { int64_t pointsWritten; // total data points written } STsdbStat; -typedef struct STsdbRepo STsdbRepo; +typedef struct STsdb STsdb; -STsdbCfg *tsdbGetCfg(const STsdbRepo *repo); +STsdbCfg *tsdbGetCfg(const STsdb *repo); // --------- TSDB REPOSITORY DEFINITION -int32_t tsdbCreateRepo(int repoid); -int32_t tsdbDropRepo(int repoid); -STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); -int tsdbCloseRepo(STsdbRepo *repo, int toCommit); -int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg); -int tsdbGetState(STsdbRepo *repo); -int8_t tsdbGetCompactState(STsdbRepo *repo); +int32_t tsdbCreateRepo(int repoid); +int32_t tsdbDropRepo(int repoid); +STsdb * tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); +int tsdbCloseRepo(STsdb *repo, int toCommit); +int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg); +int tsdbGetState(STsdb *repo); +int8_t tsdbGetCompactState(STsdb *repo); // --------- TSDB TABLE DEFINITION typedef struct { uint64_t uid; // the unique table ID @@ -131,17 +134,17 @@ void tsdbClearTableCfg(STableCfg *config); void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type); char *tsdbGetTableName(void *pTable); -#define TSDB_TABLEID(_table) ((STableId*) (_table)) -#define TSDB_PREV_ROW 0x1 -#define TSDB_NEXT_ROW 0x2 +#define TSDB_TABLEID(_table) ((STableId *)(_table)) +#define TSDB_PREV_ROW 0x1 +#define TSDB_NEXT_ROW 0x2 STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); -int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg); -int tsdbDropTable(STsdbRepo *pRepo, STableId tableId); -int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg); +int tsdbCreateTable(STsdb *repo, STableCfg *pCfg); +int tsdbDropTable(STsdb *pRepo, STableId tableId); +int tsdbUpdateTableTagValue(STsdb *repo, SUpdateTableTagValMsg *pMsg); -uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); +uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); // the TSDB repository info typedef struct STsdbRepoInfo { @@ -151,7 +154,7 @@ typedef struct STsdbRepoInfo { int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository // TODO: Other informations to add } STsdbRepoInfo; -STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo); +STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo); // the meter information report structure typedef struct { @@ -169,21 +172,21 @@ typedef struct { * * @return the number of points inserted, -1 for failure and the error number is set */ -int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); +int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); // -- FOR QUERY TIME SERIES DATA typedef void *TsdbQueryHandleT; // Use void to hide implementation details -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 +#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 +#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 +#define BLOCK_LOAD_TABLE_RR_ORDER 3 // query condition to build multi-table data block iterator typedef struct STsdbQueryCond { STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int64_t offset; // skip offset put down to tsdb + int32_t order; // desc|asc order to iterate the data block + int64_t offset; // skip offset put down to tsdb int32_t numOfCols; SColumnInfo *colList; bool loadExternalRows; // load external rows or not @@ -207,10 +210,10 @@ typedef struct { } SMemTable; typedef struct { - SMemTable* mem; - SMemTable* imem; + SMemTable *mem; + SMemTable *imem; SMemTable mtable; - SMemTable* omem; + SMemTable *omem; } SMemSnapshot; typedef struct SMemRef { @@ -218,14 +221,6 @@ typedef struct SMemRef { SMemSnapshot snapshot; } SMemRef; -typedef struct SDataBlockInfo { - STimeWindow window; - int32_t rows; - int32_t numOfCols; - int64_t uid; - int32_t tid; -} SDataBlockInfo; - typedef struct SFileBlockInfo { int32_t numBlocksOfStep; } SFileBlockInfo; @@ -237,23 +232,23 @@ typedef struct { typedef struct { uint32_t numOfTables; - SArray *pGroupList; + SArray * pGroupList; SHashObj *map; // speedup acquire the tableQueryInfo by table uid } STableGroupInfo; #define TSDB_BLOCK_DIST_STEP_ROWS 16 typedef struct { - uint16_t rowSize; - uint16_t numOfFiles; - uint32_t numOfTables; - uint64_t totalSize; - uint64_t totalRows; - int32_t maxRows; - int32_t minRows; - int32_t firstSeekTimeUs; - uint32_t numOfRowsInMemTable; - uint32_t numOfSmallBlocks; - SArray *dataBlockInfos; + uint16_t rowSize; + uint16_t numOfFiles; + uint32_t numOfTables; + uint64_t totalSize; + uint64_t totalRows; + int32_t maxRows; + int32_t minRows; + int32_t firstSeekTimeUs; + uint32_t numOfRowsInMemTable; + uint32_t numOfSmallBlocks; + SArray * dataBlockInfos; } STableBlockDist; /** @@ -266,7 +261,7 @@ typedef struct { * @param qinfo query info handle from query processor * @return */ -TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, +TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, SMemRef *pRef); /** @@ -279,14 +274,13 @@ TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STable * @param tableInfo table list. * @return */ -TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, +TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, SMemRef *pRef); +TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, + SMemRef *pMemRef); -TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef); - -bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle); - +bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle); /** * get the queried table object list @@ -303,21 +297,20 @@ SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); * @param qinfo * @return */ -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, +TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef *pRef); - /** - * get num of rows in mem table + * get num of rows in mem table * * @param pHandle * @return row size */ -int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle); +int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle); /** - * move to next block if exists + * move to next block if exists * * @param pQueryHandle * @return @@ -362,7 +355,7 @@ SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdL * @param stableid. super table sid * @param pTagCond. tag query condition */ -int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, +int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); /** @@ -379,7 +372,7 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); * @param pGroupInfo the generated result * @return */ -int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); +int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); /** * @@ -388,7 +381,7 @@ int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STab * @param pGroupInfo * @return */ -int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); +int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); /** * clean up the query handle @@ -398,9 +391,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond); -void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList); +void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList); -int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo); +int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo); // obtain queryHandle attribute int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle); @@ -416,7 +409,7 @@ void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int int tsdbInitCommitQueue(); void tsdbDestroyCommitQueue(); -int tsdbSyncCommit(STsdbRepo *repo); +int tsdbSyncCommit(STsdb *repo); void tsdbIncCommitRef(int vgId); void tsdbDecCommitRef(int vgId); void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle); @@ -426,19 +419,19 @@ int tsdbSyncSend(void *pRepo, SOCKET socketFd); int tsdbSyncRecv(void *pRepo, SOCKET socketFd); // For TSDB Compact -int tsdbCompact(STsdbRepo *pRepo); +int tsdbCompact(STsdb *pRepo); // For TSDB Health Monitor // no problem return true -bool tsdbNoProblem(STsdbRepo* pRepo); +bool tsdbNoProblem(STsdb *pRepo); // unit of walSize: MB -int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize); +int tsdbCheckWal(STsdb *pRepo, uint32_t walSize); // for json tag -void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, int16_t bytes); -void getJsonTagValueAll(void* data, void* dst, int16_t bytes); -char* parseTagDatatoJson(void *p); +void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes); +void getJsonTagValueAll(void *data, void *dst, int16_t bytes); +char *parseTagDatatoJson(void *p); #ifdef __cplusplus } diff --git a/source/dnode/vnode/impl/src/vnodeWrite.c b/source/dnode/vnode/impl/src/vnodeWrite.c index 88a73ca174..ddcb93863a 100644 --- a/source/dnode/vnode/impl/src/vnodeWrite.c +++ b/source/dnode/vnode/impl/src/vnodeWrite.c @@ -92,7 +92,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // } break; case TDMT_VND_SUBMIT: - if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr) < 0) { + if (tsdbInsertData(pVnode->pTsdb, (SSubmitMsg *)ptr, NULL) < 0) { // TODO: handle error } break; diff --git a/source/dnode/vnode/tsdb2/inc/tsdbBuffer.h b/source/dnode/vnode/tsdb2/inc/tsdbBuffer.h index 4b650d3993..24c369432b 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbBuffer.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbBuffer.h @@ -38,10 +38,10 @@ typedef struct { STsdbBufPool* tsdbNewBufPool(); void tsdbFreeBufPool(STsdbBufPool* pBufPool); -int tsdbOpenBufPool(STsdbRepo* pRepo); -void tsdbCloseBufPool(STsdbRepo* pRepo); -SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo); -int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks); +int tsdbOpenBufPool(STsdb* pRepo); +void tsdbCloseBufPool(STsdb* pRepo); +SListNode* tsdbAllocBufBlockFromPool(STsdb* pRepo); +int tsdbExpandPool(STsdb* pRepo, int32_t oldTotalBlocks); void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic); // health cite diff --git a/source/dnode/vnode/tsdb2/inc/tsdbCommit.h b/source/dnode/vnode/tsdb2/inc/tsdbCommit.h index 9cb8417c45..6f80ea1d3a 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbCommit.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbCommit.h @@ -31,16 +31,16 @@ typedef struct { #define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5) -void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn); +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn); int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord); void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord); -void *tsdbCommitData(STsdbRepo *pRepo); -int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn); +void *tsdbCommitData(STsdb *pRepo); +int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn); int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx); int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf); -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf); -int tsdbApplyRtn(STsdbRepo *pRepo); +int tsdbApplyRtn(STsdb *pRepo); static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { if (fid >= pRtn->maxFid) { diff --git a/source/dnode/vnode/tsdb2/inc/tsdbCommitQueue.h b/source/dnode/vnode/tsdb2/inc/tsdbCommitQueue.h index b690e3bdc2..a4093cea13 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbCommitQueue.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbCommitQueue.h @@ -18,6 +18,6 @@ typedef enum { COMMIT_REQ, COMPACT_REQ,COMMIT_CONFIG_REQ } TSDB_REQ_T; -int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req); +int tsdbScheduleCommit(STsdb *pRepo, TSDB_REQ_T req); #endif /* _TD_TSDB_COMMIT_QUEUE_H_ */ \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/inc/tsdbCompact.h b/source/dnode/vnode/tsdb2/inc/tsdbCompact.h index 5a382de5e0..e58332b4a1 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbCompact.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbCompact.h @@ -19,7 +19,7 @@ extern "C" { #endif -void *tsdbCompactImpl(STsdbRepo *pRepo); +void *tsdbCompactImpl(STsdb *pRepo); #ifdef __cplusplus } diff --git a/source/dnode/vnode/tsdb2/inc/tsdbFS.h b/source/dnode/vnode/tsdb2/inc/tsdbFS.h index f3a5e29c0b..5346342d69 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbFS.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbFS.h @@ -94,10 +94,10 @@ typedef struct { STsdbFS *tsdbNewFS(STsdbCfg *pCfg); void * tsdbFreeFS(STsdbFS *pfs); -int tsdbOpenFS(STsdbRepo *pRepo); -void tsdbCloseFS(STsdbRepo *pRepo); -void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd); -int tsdbEndFSTxn(STsdbRepo *pRepo); +int tsdbOpenFS(STsdb *pRepo); +void tsdbCloseFS(STsdb *pRepo); +void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); +int tsdbEndFSTxn(STsdb *pRepo); int tsdbEndFSTxnWithError(STsdbFS *pfs); void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta); void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile); @@ -106,7 +106,7 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet); void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction); void tsdbFSIterSeek(SFSIter *pIter, int fid); SDFileSet *tsdbFSIterNext(SFSIter *pIter); -int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta); +int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta); static FORCE_INLINE int tsdbRLockFS(STsdbFS* pFs) { int code = pthread_rwlock_rdlock(&(pFs->lock)); diff --git a/source/dnode/vnode/tsdb2/inc/tsdbFile.h b/source/dnode/vnode/tsdb2/inc/tsdbFile.h index 3e1a666f11..18838edea9 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbFile.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbFile.h @@ -82,7 +82,7 @@ int tsdbApplyMFileChange(SMFile* from, SMFile* to); int tsdbCreateMFile(SMFile* pMFile, bool updateHeader); int tsdbUpdateMFileHeader(SMFile* pMFile); int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo); -int tsdbScanAndTryFixMFile(STsdbRepo* pRepo); +int tsdbScanAndTryFixMFile(STsdb* pRepo); int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo); void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo); @@ -349,7 +349,7 @@ void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbCreateDFileSet(SDFileSet* pSet, bool updateHeader); int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbScanAndTryFixDFileSet(STsdbRepo* pRepo, SDFileSet* pSet); +int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet); static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { ASSERT_TSDB_FSET_NFILES_VALID(pSet); diff --git a/source/dnode/vnode/tsdb2/inc/tsdbMemTable.h b/source/dnode/vnode/tsdb2/inc/tsdbMemTable.h index 67e9976c70..639c27a644 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbMemTable.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbMemTable.h @@ -60,16 +60,16 @@ typedef struct { char cont[]; } SActCont; -int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable); -int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot, SArray* pATable); -void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemSnapshot* pSnapshot); -void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes); -int tsdbAsyncCommit(STsdbRepo* pRepo); -int tsdbSyncCommitConfig(STsdbRepo* pRepo); +int tsdbRefMemTable(STsdb* pRepo, SMemTable* pMemTable); +int tsdbUnRefMemTable(STsdb* pRepo, SMemTable* pMemTable); +int tsdbTakeMemSnapshot(STsdb* pRepo, SMemSnapshot* pSnapshot, SArray* pATable); +void tsdbUnTakeMemSnapShot(STsdb* pRepo, SMemSnapshot* pSnapshot); +void* tsdbAllocBytes(STsdb* pRepo, int bytes); +int tsdbAsyncCommit(STsdb* pRepo); +int tsdbSyncCommitConfig(STsdb* pRepo); int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols, TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo); -void* tsdbCommitData(STsdbRepo* pRepo); +void* tsdbCommitData(STsdb* pRepo); static FORCE_INLINE SMemRow tsdbNextIterRow(SSkipListIterator* pIter) { if (pIter == NULL) return NULL; diff --git a/source/dnode/vnode/tsdb2/inc/tsdbMeta.h b/source/dnode/vnode/tsdb2/inc/tsdbMeta.h index 9cdb8a83aa..0324fff343 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbMeta.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbMeta.h @@ -16,15 +16,17 @@ #ifndef _TD_TSDB_META_H_ #define _TD_TSDB_META_H_ +#include "tskiplist.h" + #define TSDB_MAX_TABLE_SCHEMAS 16 -#pragma pack (push,1) -typedef struct jsonMapValue{ - void* table; // STable * - int16_t colId; // the json col ID. -}JsonMapValue; +#pragma pack(push, 1) +typedef struct jsonMapValue { + void* table; // STable * + int16_t colId; // the json col ID. +} JsonMapValue; -#pragma pack (pop) +#pragma pack(pop) typedef struct STable { STableId tableId; @@ -44,8 +46,7 @@ typedef struct STable { char* sql; void* cqhandle; SRWLatch latch; // TODO: implementa latch functions - - SDataCol *lastCols; + SDataCol* lastCols; int16_t maxColNum; int16_t restoreColumnNum; bool hasRestoreLastColumn; @@ -81,44 +82,45 @@ typedef struct { STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg); void tsdbFreeMeta(STsdbMeta* pMeta); -int tsdbOpenMeta(STsdbRepo* pRepo); -int tsdbCloseMeta(STsdbRepo* pRepo); +int tsdbOpenMeta(STsdb* pRepo); +int tsdbCloseMeta(STsdb* pRepo); STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version, int8_t rowType); -int tsdbWLockRepoMeta(STsdbRepo* pRepo); -int tsdbRLockRepoMeta(STsdbRepo* pRepo); -int tsdbUnlockRepoMeta(STsdbRepo* pRepo); +int tsdbWLockRepoMeta(STsdb* pRepo); +int tsdbRLockRepoMeta(STsdb* pRepo); +int tsdbUnlockRepoMeta(STsdb* pRepo); void tsdbRefTable(STable* pTable); void tsdbUnRefTable(STable* pTable); -void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); -int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen); -void tsdbOrgMeta(STsdbRepo* pRepo); +void tsdbUpdateTableSchema(STsdb* pRepo, STable* pTable, STSchema* pSchema, bool insertAct); +int tsdbRestoreTable(STsdb* pRepo, void* cont, int contLen); +void tsdbOrgMeta(STsdb* pRepo); int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema); int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId); -int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema); -STSchema* tsdbGetTableLatestSchema(STable *pTable); +int tsdbUpdateLastColSchema(STable* pTable, STSchema* pNewSchema); +STSchema* tsdbGetTableLatestSchema(STable* pTable); void tsdbFreeLastColumns(STable* pTable); int tsdbCompareJsonMapValue(const void* a, const void* b); void* tsdbGetJsonTagValue(STable* pTable, char* key, int32_t keyLen, int16_t* colId); -static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) { - if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) { +static FORCE_INLINE int tsdbCompareSchemaVersion(const void* key1, const void* key2) { + if (*(int16_t*)key1 < schemaVersion(*(STSchema**)key2)) { return -1; - } else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) { + } else if (*(int16_t*)key1 > schemaVersion(*(STSchema**)key2)) { return 1; } else { return 0; } } -static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version, int8_t rowType) { +static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version, + int8_t rowType) { STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose STSchema* pSchema = NULL; STSchema* pTSchema = NULL; if (lock) TSDB_RLOCK_TABLE(pDTable); if (_version < 0) { // get the latest version of schema - pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema); + pTSchema = *(STSchema**)taosArrayGetLast(pDTable->schema); } else { // get the schema with version void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ); if (ptr == NULL) { @@ -149,9 +151,9 @@ static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) { return tsdbGetTableSchemaImpl(pTable, false, false, -1, -1); } -static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) { +static FORCE_INLINE STSchema* tsdbGetTableTagSchema(STable* pTable) { if (pTable->type == TSDB_CHILD_TABLE) { // check child table first - STable *pSuper = pTable->pSuper; + STable* pSuper = pTable->pSuper; if (pSuper == NULL) return NULL; return pSuper->tagSchema; } else if (pTable->type == TSDB_SUPER_TABLE) { diff --git a/source/dnode/vnode/tsdb2/inc/tsdbReadImpl.h b/source/dnode/vnode/tsdb2/inc/tsdbReadImpl.h index 20d8b88c83..9f3fb8b683 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbReadImpl.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbReadImpl.h @@ -16,12 +16,13 @@ #ifndef _TD_TSDB_READ_IMPL_H_ #define _TD_TSDB_READ_IMPL_H_ +#include "os.h" #include "tfs.h" #include "tsdb.h" -#include "os.h" #include "tsdbFile.h" -#include "tskiplist.h" +#include "tsdbMemory.h" #include "tsdbMeta.h" +#include "tskiplist.h" typedef struct SReadH SReadH; @@ -92,7 +93,7 @@ typedef enum { #define SBlockVerLatest TSDB_SBLK_VER_1 -#define SBlock SBlockV1 // latest SBlock definition +#define SBlock SBlockV1 // latest SBlock definition // lastest SBlockInfo definition typedef struct { @@ -126,7 +127,7 @@ typedef struct { uint32_t offset : 24; } SBlockColV1; -#define SBlockCol SBlockColV1 // latest SBlockCol definition +#define SBlockCol SBlockColV1 // latest SBlockCol definition typedef struct { int16_t colId; @@ -162,19 +163,19 @@ typedef struct { typedef void SAggrBlkData; // SBlockCol cols[]; struct SReadH { - STsdbRepo * pRepo; - SDFileSet rSet; // FSET to read - SArray * aBlkIdx; // SBlockIdx array - STable * pTable; // table to read - SBlockIdx * pBlkIdx; // current reading table SBlockIdx - int cidx; - SBlockInfo * pBlkInfo; // SBlockInfoV# - SBlockData *pBlkData; // Block info + STsdb * pRepo; + SDFileSet rSet; // FSET to read + SArray * aBlkIdx; // SBlockIdx array + STable * pTable; // table to read + SBlockIdx * pBlkIdx; // current reading table SBlockIdx + int cidx; + SBlockInfo * pBlkInfo; // SBlockInfoV# + SBlockData * pBlkData; // Block info SAggrBlkData *pAggrBlkData; // Aggregate Block info - SDataCols * pDCols[2]; - void * pBuf; // buffer - void * pCBuf; // compression buffer - void * pExBuf; // extra buffer + SDataCols * pDCols[2]; + void * pBuf; // buffer + void * pCBuf; // compression buffer + void * pExBuf; // extra buffer }; #define TSDB_READ_REPO(rh) ((rh)->pRepo) @@ -216,7 +217,7 @@ static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { } } -int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo); +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); void tsdbDestroyReadH(SReadH *pReadh); int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); void tsdbCloseAndUnsetFSet(SReadH *pReadh); diff --git a/source/dnode/vnode/tsdb2/inc/tsdbint.h b/source/dnode/vnode/tsdb2/inc/tsdbint.h index 5be05bbd4d..6d3d94e8fc 100644 --- a/source/dnode/vnode/tsdb2/inc/tsdbint.h +++ b/source/dnode/vnode/tsdb2/inc/tsdbint.h @@ -62,7 +62,7 @@ extern "C" { #include "tsdbRowMergeBuf.h" // Main definitions -struct STsdbRepo { +struct STsdb { uint8_t state; STsdbCfg config; @@ -97,17 +97,17 @@ struct STsdbRepo { #define IS_REPO_LOCKED(r) (r)->repoLocked #define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg) -int tsdbLockRepo(STsdbRepo* pRepo); -int tsdbUnlockRepo(STsdbRepo* pRepo); -STsdbMeta* tsdbGetMeta(STsdbRepo* pRepo); -int tsdbCheckCommit(STsdbRepo* pRepo); -int tsdbRestoreInfo(STsdbRepo* pRepo); -UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg); -int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable* pTable); +int tsdbLockRepo(STsdb* pRepo); +int tsdbUnlockRepo(STsdb* pRepo); +STsdbMeta* tsdbGetMeta(STsdb* pRepo); +int tsdbCheckCommit(STsdb* pRepo); +int tsdbRestoreInfo(STsdb* pRepo); +UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg); +int32_t tsdbLoadLastCache(STsdb *pRepo, STable* pTable); void tsdbGetRootDir(int repoid, char dirName[]); void tsdbGetDataDir(int repoid, char dirName[]); -static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) { +static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdb* pRepo) { ASSERT(pRepo != NULL); if (pRepo->mem == NULL) return NULL; diff --git a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c index 03f3d78745..62f345ac8a 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbBuffer.c +++ b/source/dnode/vnode/tsdb2/src/tsdbBuffer.c @@ -58,7 +58,7 @@ void tsdbFreeBufPool(STsdbBufPool *pBufPool) { } } -int tsdbOpenBufPool(STsdbRepo *pRepo) { +int tsdbOpenBufPool(STsdb *pRepo) { STsdbCfg * pCfg = &(pRepo->config); STsdbBufPool *pPool = pRepo->pPool; @@ -93,7 +93,7 @@ _err: return -1; } -void tsdbCloseBufPool(STsdbRepo *pRepo) { +void tsdbCloseBufPool(STsdb *pRepo) { if (pRepo == NULL) return; STsdbBufPool * pBufPool = pRepo->pPool; @@ -111,7 +111,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) { tsdbDebug("vgId:%d, buffer pool is closed", REPO_ID(pRepo)); } -SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) { +SListNode *tsdbAllocBufBlockFromPool(STsdb *pRepo) { ASSERT(pRepo != NULL && pRepo->pPool != NULL); ASSERT(IS_REPO_LOCKED(pRepo)); @@ -165,7 +165,7 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) { void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } -int tsdbExpandPool(STsdbRepo *pRepo, int32_t oldTotalBlocks) { +int tsdbExpandPool(STsdb *pRepo, int32_t oldTotalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) { return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/tsdb2/src/tsdbCommit.c b/source/dnode/vnode/tsdb2/src/tsdbCommit.c index 8355409beb..21d755614a 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb2/src/tsdbCommit.c @@ -59,18 +59,18 @@ typedef struct { #define TSDB_COMMIT_DEFAULT_ROWS(ch) TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock) #define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch))) -static int tsdbCommitMeta(STsdbRepo *pRepo); +static int tsdbCommitMeta(STsdb *pRepo); static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void *cont, int contLen, bool compact); static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid); -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile); -static int tsdbCommitTSData(STsdbRepo *pRepo); -static void tsdbStartCommit(STsdbRepo *pRepo); -static void tsdbEndCommit(STsdbRepo *pRepo, int eno); +static int tsdbCompactMetaFile(STsdb *pRepo, STsdbFS *pfs, SMFile *pMFile); +static int tsdbCommitTSData(STsdb *pRepo); +static void tsdbStartCommit(STsdb *pRepo); +static void tsdbEndCommit(STsdb *pRepo, int eno); static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid); static int tsdbCreateCommitIters(SCommitH *pCommith); static void tsdbDestroyCommitIters(SCommitH *pCommith); static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key); -static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo); +static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo); static void tsdbDestroyCommitH(SCommitH *pCommith); static int tsdbGetFidLevel(int fid, SRtn *pRtn); static int tsdbNextCommitFid(SCommitH *pCommith); @@ -92,7 +92,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget, TSKEY maxKey, int maxRows, int8_t update); -void *tsdbCommitData(STsdbRepo *pRepo) { +void *tsdbCommitData(STsdb *pRepo) { if (pRepo->imem == NULL) { return NULL; } @@ -121,7 +121,7 @@ _err: return NULL; } -int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn) { +int tsdbApplyRtnOnFSet(STsdb *pRepo, SDFileSet *pSet, SRtn *pRtn) { SDiskID did; SDFileSet nSet; STsdbFS * pfs = REPO_FS(pRepo); @@ -266,7 +266,7 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) { } // =================== Commit Meta Data -static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) { +static int tsdbInitCommitMetaFile(STsdb *pRepo, SMFile *pMf, bool open) { STsdbFS *pfs = REPO_FS(pRepo); SMFile * pOMFile = pfs->cstatus->pmf; SDiskID did; @@ -295,7 +295,7 @@ static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) { return 0; } -static int tsdbCommitMeta(STsdbRepo *pRepo) { +static int tsdbCommitMeta(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SMemTable *pMem = pRepo->imem; SMFile * pOMFile = pfs->cstatus->pmf; @@ -387,7 +387,7 @@ void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord) { return buf; } -void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) { +void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn) { STsdbCfg *pCfg = REPO_CFG(pRepo); TSKEY minKey, midKey, maxKey, now; @@ -476,7 +476,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) { return 0; } -static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { +static int tsdbCompactMetaFile(STsdb *pRepo, STsdbFS *pfs, SMFile *pMFile) { float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); float compactRatio = (float)(tsTsdbMetaCompactRatio) / 100; @@ -602,7 +602,7 @@ _err: } // =================== Commit Time-Series Data -static int tsdbCommitTSData(STsdbRepo *pRepo) { +static int tsdbCommitTSData(STsdb *pRepo) { SMemTable *pMem = pRepo->imem; SCommitH commith; SDFileSet *pSet = NULL; @@ -678,7 +678,7 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) { return 0; } -static void tsdbStartCommit(STsdbRepo *pRepo) { +static void tsdbStartCommit(STsdb *pRepo) { SMemTable *pMem = pRepo->imem; ASSERT(pMem->numOfRows > 0 || listNEles(pMem->actList) > 0); @@ -691,7 +691,7 @@ static void tsdbStartCommit(STsdbRepo *pRepo) { pRepo->code = TSDB_CODE_SUCCESS; } -static void tsdbEndCommit(STsdbRepo *pRepo, int eno) { +static void tsdbEndCommit(STsdb *pRepo, int eno) { if (eno != TSDB_CODE_SUCCESS) { tsdbEndFSTxnWithError(REPO_FS(pRepo)); } else { @@ -721,7 +721,7 @@ static bool tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS #endif static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); ASSERT(pSet == NULL || pSet->fid == fid); @@ -776,7 +776,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } static int tsdbCreateCommitIters(SCommitH *pCommith) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SMemTable *pMem = pRepo->imem; STsdbMeta *pMeta = pRepo->tsdbMeta; @@ -839,7 +839,7 @@ static void tsdbSeekCommitIter(SCommitH *pCommith, TSKEY key) { } } -static int tsdbInitCommitH(SCommitH *pCommith, STsdbRepo *pRepo) { +static int tsdbInitCommitH(SCommitH *pCommith, STsdb *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pCommith, 0, sizeof(*pCommith)); @@ -902,7 +902,7 @@ static void tsdbDestroyCommitH(SCommitH *pCommith) { } static int tsdbNextCommitFid(SCommitH *pCommith) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); int fid = TSDB_IVLD_FID; @@ -1057,7 +1057,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) { } } -int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, +int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { STsdbCfg * pCfg = REPO_CFG(pRepo); SBlockData * pBlockData; @@ -1252,7 +1252,7 @@ static int tsdbWriteBlockInfo(SCommitH *pCommih) { } static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLimit, bool toData) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); SMergeInfo mInfo; int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith); @@ -1285,7 +1285,7 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi } static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks; SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx; @@ -1410,7 +1410,7 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); SBlock block; SDFile * pDFile; @@ -1527,7 +1527,7 @@ static void tsdbResetCommitTable(SCommitH *pCommith) { static int tsdbSetAndOpenCommitFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { SDiskID did; - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); SDFileSet *pWSet = TSDB_COMMIT_WRITE_FSET(pCommith); tfsAllocDisk(tsdbGetFidLevel(fid, &(pCommith->rtn)), &(did.level), &(did.id)); @@ -1732,7 +1732,7 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError) { } static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *pInfo) { - STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); + STsdb *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbCfg * pCfg = REPO_CFG(pRepo); int mergeRows = pBlock->numOfRows + pInfo->rowsInserted - pInfo->rowsDeleteSucceed; @@ -1749,7 +1749,7 @@ static bool tsdbCanAddSubBlock(SCommitH *pCommith, SBlock *pBlock, SMergeInfo *p return false; } -int tsdbApplyRtn(STsdbRepo *pRepo) { +int tsdbApplyRtn(STsdb *pRepo) { SRtn rtn; SFSIter fsiter; STsdbFS * pfs = REPO_FS(pRepo); diff --git a/source/dnode/vnode/tsdb2/src/tsdbCommitQueue.c b/source/dnode/vnode/tsdb2/src/tsdbCommitQueue.c index dccb85af55..cb82475311 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbCommitQueue.c +++ b/source/dnode/vnode/tsdb2/src/tsdbCommitQueue.c @@ -27,7 +27,7 @@ typedef struct { typedef struct { TSDB_REQ_T req; - STsdbRepo *pRepo; + STsdb *pRepo; } SReq; static void *tsdbLoopCommit(void *arg); @@ -91,7 +91,7 @@ void tsdbDestroyCommitQueue() { pthread_mutex_destroy(&(pQueue->lock)); } -int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { +int tsdbScheduleCommit(STsdb *pRepo, TSDB_REQ_T req) { SCommitQueue *pQueue = &tsCommitQueue; SListNode *pNode = (SListNode *)calloc(1, sizeof(SListNode) + sizeof(SReq)); @@ -114,7 +114,7 @@ int tsdbScheduleCommit(STsdbRepo *pRepo, TSDB_REQ_T req) { return 0; } -static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { +static void tsdbApplyRepoConfig(STsdb *pRepo) { pthread_mutex_lock(&pRepo->save_mutex); pRepo->config_changed = false; @@ -157,7 +157,7 @@ static void tsdbApplyRepoConfig(STsdbRepo *pRepo) { static void *tsdbLoopCommit(void *arg) { SCommitQueue *pQueue = &tsCommitQueue; SListNode * pNode = NULL; - STsdbRepo * pRepo = NULL; + STsdb * pRepo = NULL; TSDB_REQ_T req; setThreadName("tsdbCommit"); diff --git a/source/dnode/vnode/tsdb2/src/tsdbCompact.c b/source/dnode/vnode/tsdb2/src/tsdbCompact.c index ee676c7d7b..cf966b8229 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbCompact.c +++ b/source/dnode/vnode/tsdb2/src/tsdbCompact.c @@ -43,14 +43,14 @@ typedef struct { #define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) #define TSDB_COMPACT_EXBUF(pComph) TSDB_READ_EXBUF(&((pComph)->readh)) -static int tsdbAsyncCompact(STsdbRepo *pRepo); -static void tsdbStartCompact(STsdbRepo *pRepo); -static void tsdbEndCompact(STsdbRepo *pRepo, int eno); -static int tsdbCompactMeta(STsdbRepo *pRepo); -static int tsdbCompactTSData(STsdbRepo *pRepo); +static int tsdbAsyncCompact(STsdb *pRepo); +static void tsdbStartCompact(STsdb *pRepo); +static void tsdbEndCompact(STsdb *pRepo, int eno); +static int tsdbCompactMeta(STsdb *pRepo); +static int tsdbCompactTSData(STsdb *pRepo); static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet); static bool tsdbShouldCompact(SCompactH *pComph); -static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo); +static int tsdbInitCompactH(SCompactH *pComph, STsdb *pRepo); static void tsdbDestroyCompactH(SCompactH *pComph); static int tsdbInitCompTbArray(SCompactH *pComph); static void tsdbDestroyCompTbArray(SCompactH *pComph); @@ -62,9 +62,9 @@ static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCo void **ppCBuf, void **ppExBuf); enum { TSDB_NO_COMPACT, TSDB_IN_COMPACT, TSDB_WAITING_COMPACT}; -int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); } +int tsdbCompact(STsdb *pRepo) { return tsdbAsyncCompact(pRepo); } -void *tsdbCompactImpl(STsdbRepo *pRepo) { +void *tsdbCompactImpl(STsdb *pRepo) { // Check if there are files in TSDB FS to compact if (REPO_FS(pRepo)->cstatus->pmf == NULL) { pRepo->compactState = TSDB_NO_COMPACT; @@ -94,7 +94,7 @@ _err: return NULL; } -static int tsdbAsyncCompact(STsdbRepo *pRepo) { +static int tsdbAsyncCompact(STsdb *pRepo) { if (pRepo->compactState != TSDB_NO_COMPACT) { tsdbInfo("vgId:%d not compact tsdb again ", REPO_ID(pRepo)); return 0; @@ -104,7 +104,7 @@ static int tsdbAsyncCompact(STsdbRepo *pRepo) { return tsdbScheduleCommit(pRepo, COMPACT_REQ); } -static void tsdbStartCompact(STsdbRepo *pRepo) { +static void tsdbStartCompact(STsdb *pRepo) { assert(pRepo->compactState != TSDB_IN_COMPACT); tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo)); tsdbStartFSTxn(pRepo, 0, 0); @@ -112,7 +112,7 @@ static void tsdbStartCompact(STsdbRepo *pRepo) { pRepo->compactState = TSDB_IN_COMPACT; } -static void tsdbEndCompact(STsdbRepo *pRepo, int eno) { +static void tsdbEndCompact(STsdb *pRepo, int eno) { if (eno != TSDB_CODE_SUCCESS) { tsdbEndFSTxnWithError(REPO_FS(pRepo)); } else { @@ -123,13 +123,13 @@ static void tsdbEndCompact(STsdbRepo *pRepo, int eno) { tsem_post(&(pRepo->readyToCommit)); } -static int tsdbCompactMeta(STsdbRepo *pRepo) { +static int tsdbCompactMeta(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); tsdbUpdateMFile(pfs, pfs->cstatus->pmf); return 0; } - static int tsdbCompactTSData(STsdbRepo *pRepo) { + static int tsdbCompactTSData(STsdb *pRepo) { SCompactH compactH; SDFileSet *pSet = NULL; @@ -172,7 +172,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) { - STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph); + STsdb *pRepo = TSDB_COMPACT_REPO(pComph); SDiskID did; tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet), @@ -226,7 +226,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { // if (tsdbForceCompactFile) { // return true; // } - STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph); + STsdb * pRepo = TSDB_COMPACT_REPO(pComph); STsdbCfg * pCfg = REPO_CFG(pRepo); SReadH * pReadh = &(pComph->readh); STableCompactH *pTh; @@ -271,7 +271,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { (tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85)); } - static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) { + static int tsdbInitCompactH(SCompactH *pComph, STsdb *pRepo) { STsdbCfg *pCfg = REPO_CFG(pRepo); memset(pComph, 0, sizeof(*pComph)); @@ -324,7 +324,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } static int tsdbInitCompTbArray(SCompactH *pComph) { // Init pComp->tbArray - STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph); + STsdb *pRepo = TSDB_COMPACT_REPO(pComph); STsdbMeta *pMeta = pRepo->tsdbMeta; if (tsdbRLockRepoMeta(pRepo) < 0) return -1; @@ -421,7 +421,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); } static int tsdbCompactFSetImpl(SCompactH *pComph) { - STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph); + STsdb *pRepo = TSDB_COMPACT_REPO(pComph); STsdbCfg * pCfg = REPO_CFG(pRepo); SReadH * pReadh = &(pComph->readh); SBlockIdx blkIdx; @@ -508,7 +508,7 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf, void **ppCBuf, void **ppExBuf) { - STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph); + STsdb *pRepo = TSDB_COMPACT_REPO(pComph); STsdbCfg * pCfg = REPO_CFG(pRepo); SDFile * pDFile; bool isLast; diff --git a/source/dnode/vnode/tsdb2/src/tsdbFS.c b/source/dnode/vnode/tsdb2/src/tsdbFS.c index dc9c396511..121e0ccbdf 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbFS.c +++ b/source/dnode/vnode/tsdb2/src/tsdbFS.c @@ -26,17 +26,17 @@ static void tsdbResetFSStatus(SFSStatus *pStatus); static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid); static void tsdbApplyFSTxnOnDisk(SFSStatus *pFrom, SFSStatus *pTo); static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]); -static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo); -static int tsdbScanAndTryFixFS(STsdbRepo *pRepo); -static int tsdbScanRootDir(STsdbRepo *pRepo); -static int tsdbScanDataDir(STsdbRepo *pRepo); +static int tsdbOpenFSFromCurrent(STsdb *pRepo); +static int tsdbScanAndTryFixFS(STsdb *pRepo); +static int tsdbScanRootDir(STsdb *pRepo); +static int tsdbScanDataDir(STsdb *pRepo); static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf); -static int tsdbRestoreCurrent(STsdbRepo *pRepo); +static int tsdbRestoreCurrent(STsdb *pRepo); static int tsdbComparTFILE(const void *arg1, const void *arg2); -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired); -static int tsdbProcessExpiredFS(STsdbRepo *pRepo); -static int tsdbCreateMeta(STsdbRepo *pRepo); -static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray); +static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired); +static int tsdbProcessExpiredFS(STsdb *pRepo); +static int tsdbCreateMeta(STsdb *pRepo); +static int tsdbFetchTFileSet(STsdb *pRepo, SArray **fArray); // For backward compatibility // ================== CURRENT file header info @@ -159,7 +159,7 @@ static SFSStatus *tsdbNewFSStatus(int maxFSet) { static SFSStatus *tsdbFreeFSStatus(SFSStatus *pStatus) { if (pStatus) { - pStatus->df = taosArrayDestroy(&pStatus->df); + pStatus->df = taosArrayDestroy(pStatus->df); free(pStatus); } @@ -253,7 +253,7 @@ void *tsdbFreeFS(STsdbFS *pfs) { return NULL; } -static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { +static int tsdbProcessExpiredFS(STsdb *pRepo) { tsdbStartFSTxn(pRepo, 0, 0); if (tsdbCreateMeta(pRepo) < 0) { tsdbError("vgId:%d failed to create meta since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -272,7 +272,7 @@ static int tsdbProcessExpiredFS(STsdbRepo *pRepo) { return 0; } -static int tsdbCreateMeta(STsdbRepo *pRepo) { +static int tsdbCreateMeta(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); SMFile * pOMFile = pfs->cstatus->pmf; SMFile mf; @@ -309,7 +309,7 @@ static int tsdbCreateMeta(STsdbRepo *pRepo) { return 0; } -int tsdbOpenFS(STsdbRepo *pRepo) { +int tsdbOpenFS(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); char current[TSDB_FILENAME_LEN] = "\0"; int nExpired = 0; @@ -351,12 +351,12 @@ int tsdbOpenFS(STsdbRepo *pRepo) { return 0; } -void tsdbCloseFS(STsdbRepo *pRepo) { +void tsdbCloseFS(STsdb *pRepo) { // Do nothing } // Start a new transaction to modify the file system -void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { +void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd) { STsdbFS *pfs = REPO_FS(pRepo); ASSERT(pfs->intxn == false); @@ -374,7 +374,7 @@ void tsdbStartFSTxn(STsdbRepo *pRepo, int64_t pointsAdd, int64_t storageAdd) { void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta) { pfs->nstatus->meta = *pMeta; } -int tsdbEndFSTxn(STsdbRepo *pRepo) { +int tsdbEndFSTxn(STsdb *pRepo) { STsdbFS *pfs = REPO_FS(pRepo); ASSERT(FS_IN_TXN(pfs)); SFSStatus *pStatus; @@ -655,7 +655,7 @@ static void tsdbGetTxnFname(int repoid, TSDB_TXN_FILE_T ftype, char fname[]) { snprintf(fname, TSDB_FILENAME_LEN, "%s/vnode/vnode%d/tsdb/%s", TFS_PRIMARY_PATH(), repoid, tsdbTxnFname[ftype]); } -static int tsdbOpenFSFromCurrent(STsdbRepo *pRepo) { +static int tsdbOpenFSFromCurrent(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); int fd = -1; void * buffer = NULL; @@ -752,7 +752,7 @@ _err: } // Scan and try to fix incorrect files -static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { +static int tsdbScanAndTryFixFS(STsdb *pRepo) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; @@ -778,7 +778,7 @@ static int tsdbScanAndTryFixFS(STsdbRepo *pRepo) { return 0; } -int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { +int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta) { char tbuf[128]; STsdbFS * pfs = REPO_FS(pRepo); SMFile mf; @@ -914,7 +914,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) { return 0; } -static int tsdbScanRootDir(STsdbRepo *pRepo) { +static int tsdbScanRootDir(STsdb *pRepo) { char rootDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; STsdbFS * pfs = REPO_FS(pRepo); @@ -948,7 +948,7 @@ static int tsdbScanRootDir(STsdbRepo *pRepo) { return 0; } -static int tsdbScanDataDir(STsdbRepo *pRepo) { +static int tsdbScanDataDir(STsdb *pRepo) { char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; STsdbFS * pfs = REPO_FS(pRepo); @@ -992,7 +992,7 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const TFILE *pf) { return false; } -static int tsdbRestoreMeta(STsdbRepo *pRepo) { +static int tsdbRestoreMeta(STsdb *pRepo) { char rootDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; TDIR * tdir = NULL; @@ -1113,7 +1113,7 @@ static int tsdbRestoreMeta(STsdbRepo *pRepo) { return 0; } -static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) { +static int tsdbFetchTFileSet(STsdb *pRepo, SArray **fArray) { char dataDir[TSDB_FILENAME_LEN]; char bname[TSDB_FILENAME_LEN]; TDIR * tdir = NULL; @@ -1139,7 +1139,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) { if (tdir == NULL) { tsdbError("vgId:%d failed to fetch TFileSet while open directory %s since %s", REPO_ID(pRepo), dataDir, tstrerror(terrno)); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1152,7 +1152,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) { if (taosArrayPush(*fArray, (void *)pf) == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; tfsClosedir(tdir); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1166,7 +1166,7 @@ static int tsdbFetchTFileSet(STsdbRepo *pRepo, SArray **fArray) { tsdbError("vgId:%d failed to fetch TFileSet Array while run regexec since %s", REPO_ID(pRepo), strerror(code)); terrno = TAOS_SYSTEM_ERROR(code); tfsClosedir(tdir); - taosArrayDestroy(fArray); + taosArrayDestroy(*fArray); regfree(®ex); return -1; } @@ -1191,7 +1191,7 @@ static bool tsdbIsDFileSetValid(int nFiles) { } } -static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { +static int tsdbRestoreDFileSet(STsdb *pRepo) { const TFILE *pf = NULL; SArray * fArray = NULL; STsdbFS * pfs = REPO_FS(pRepo); @@ -1351,7 +1351,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) { return 0; } -static int tsdbRestoreCurrent(STsdbRepo *pRepo) { +static int tsdbRestoreCurrent(STsdb *pRepo) { // Loop to recover mfile if (tsdbRestoreMeta(pRepo) < 0) { tsdbError("vgId:%d failed to restore current since %s", REPO_ID(pRepo), tstrerror(terrno)); @@ -1408,7 +1408,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { } } -static void tsdbScanAndTryFixDFilesHeader(STsdbRepo *pRepo, int32_t *nExpired) { +static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) { STsdbFS * pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; SDFInfo info; diff --git a/source/dnode/vnode/tsdb2/src/tsdbFile.c b/source/dnode/vnode/tsdb2/src/tsdbFile.c index 77d172893e..daff85f015 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbFile.c +++ b/source/dnode/vnode/tsdb2/src/tsdbFile.c @@ -187,7 +187,7 @@ int tsdbLoadMFileHeader(SMFile *pMFile, SMFInfo *pInfo) { return 0; } -int tsdbScanAndTryFixMFile(STsdbRepo *pRepo) { +int tsdbScanAndTryFixMFile(STsdb *pRepo) { SMFile * pMFile = pRepo->fs->cstatus->pmf; struct stat mfstat; SMFile mf; @@ -435,7 +435,7 @@ int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo) { return 0; } -static int tsdbScanAndTryFixDFile(STsdbRepo *pRepo, SDFile *pDFile) { +static int tsdbScanAndTryFixDFile(STsdb *pRepo, SDFile *pDFile) { struct stat dfstat; SDFile df; @@ -545,7 +545,7 @@ static int tsdbRollBackDFile(SDFile *pDFile) { return -1; } - if (taosFtruncate(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { + if (taosFtruncateFile(TSDB_FILE_FD(&df), pDFile->info.size) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); tsdbCloseDFile(&df); return -1; @@ -672,7 +672,7 @@ int tsdbUpdateDFileSetHeader(SDFileSet *pSet) { return 0; } -int tsdbScanAndTryFixDFileSet(STsdbRepo *pRepo, SDFileSet *pSet) { +int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet) { ASSERT_TSDB_FSET_NFILES_VALID(pSet); for (TSDB_FILE_T ftype = 0; ftype < tsdbGetNFiles(pSet); ftype++) { if (tsdbScanAndTryFixDFile(pRepo, TSDB_DFILE_IN_SET(pSet, ftype)) < 0) { diff --git a/source/dnode/vnode/tsdb2/src/tsdbMain.c b/source/dnode/vnode/tsdb2/src/tsdbMain.c index 20f93cb0a4..bde453b738 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMain.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMain.c @@ -25,12 +25,12 @@ #define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP)) static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg); -static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); -static void tsdbFreeRepo(STsdbRepo *pRepo); -static void tsdbStartStream(STsdbRepo *pRepo); -static void tsdbStopStream(STsdbRepo *pRepo); -static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh); -static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); +static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH); +static void tsdbFreeRepo(STsdb *pRepo); +static void tsdbStartStream(STsdb *pRepo); +static void tsdbStopStream(STsdb *pRepo); +static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh); +static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx); // Function declaration int32_t tsdbCreateRepo(int repoid) { @@ -63,8 +63,8 @@ int32_t tsdbDropRepo(int repoid) { return tfsRmdir(tsdbDir); } -STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { - STsdbRepo *pRepo; +STsdb *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { + STsdb *pRepo; STsdbCfg config = *pCfg; terrno = TSDB_CODE_SUCCESS; @@ -119,10 +119,10 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { } // Note: all working thread and query thread must stopped when calling this function -int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { +int tsdbCloseRepo(STsdb *repo, int toCommit) { if (repo == NULL) return 0; - STsdbRepo *pRepo = repo; + STsdb *pRepo = repo; int vgId = REPO_ID(pRepo); terrno = TSDB_CODE_SUCCESS; @@ -157,12 +157,12 @@ int tsdbCloseRepo(STsdbRepo *repo, int toCommit) { } } -STsdbCfg *tsdbGetCfg(const STsdbRepo *repo) { +STsdbCfg *tsdbGetCfg(const STsdb *repo) { ASSERT(repo != NULL); - return &((STsdbRepo *)repo)->config; + return &((STsdb *)repo)->config; } -int tsdbLockRepo(STsdbRepo *pRepo) { +int tsdbLockRepo(STsdb *pRepo) { int code = pthread_mutex_lock(&pRepo->mutex); if (code != 0) { tsdbError("vgId:%d failed to lock tsdb since %s", REPO_ID(pRepo), strerror(errno)); @@ -173,7 +173,7 @@ int tsdbLockRepo(STsdbRepo *pRepo) { return 0; } -int tsdbUnlockRepo(STsdbRepo *pRepo) { +int tsdbUnlockRepo(STsdb *pRepo) { ASSERT(IS_REPO_LOCKED(pRepo)); pRepo->repoLocked = false; int code = pthread_mutex_unlock(&pRepo->mutex); @@ -193,7 +193,7 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) { // return 0; // } -int tsdbCheckCommit(STsdbRepo *pRepo) { +int tsdbCheckCommit(STsdb *pRepo) { ASSERT(pRepo->mem != NULL); STsdbCfg *pCfg = &(pRepo->config); @@ -207,23 +207,23 @@ int tsdbCheckCommit(STsdbRepo *pRepo) { return 0; } -STsdbMeta *tsdbGetMeta(STsdbRepo *pRepo) { return pRepo->tsdbMeta; } +STsdbMeta *tsdbGetMeta(STsdb *pRepo) { return pRepo->tsdbMeta; } -STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; } +STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo) { return NULL; } -int tsdbGetState(STsdbRepo *repo) { return repo->state; } +int tsdbGetState(STsdb *repo) { return repo->state; } -int8_t tsdbGetCompactState(STsdbRepo *repo) { return (int8_t)(repo->compactState); } +int8_t tsdbGetCompactState(STsdb *repo) { return (int8_t)(repo->compactState); } void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) { ASSERT(repo != NULL); - STsdbRepo *pRepo = repo; + STsdb *pRepo = repo; *totalPoints = pRepo->stat.pointsWritten; *totalStorage = pRepo->stat.totalStorage; *compStorage = pRepo->stat.compStorage; } -int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { +int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg) { // TODO: think about multithread cases if (tsdbCheckAndSetDefaultCfg(pCfg) < 0) return -1; @@ -343,7 +343,7 @@ int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg) { #endif } -uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { +uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) { // TODO return 0; #if 0 @@ -564,8 +564,8 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) { return 0; } -static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { - STsdbRepo *pRepo = (STsdbRepo *)calloc(1, sizeof(*pRepo)); +static STsdb *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { + STsdb *pRepo = (STsdb *)calloc(1, sizeof(*pRepo)); if (pRepo == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; return NULL; @@ -629,7 +629,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) { return pRepo; } -static void tsdbFreeRepo(STsdbRepo *pRepo) { +static void tsdbFreeRepo(STsdb *pRepo) { if (pRepo) { tsdbFreeFS(pRepo->fs); tsdbFreeBufPool(pRepo->pPool); @@ -643,7 +643,7 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) { } } -static void tsdbStartStream(STsdbRepo *pRepo) { +static void tsdbStartStream(STsdb *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; for (int i = 0; i < pMeta->maxTables; i++) { @@ -655,7 +655,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) { } } -static void tsdbStopStream(STsdbRepo *pRepo) { +static void tsdbStopStream(STsdb *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; for (int i = 0; i < pMeta->maxTables; i++) { @@ -666,7 +666,7 @@ static void tsdbStopStream(STsdbRepo *pRepo) { } } -static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh) { +static int tsdbRestoreLastColumns(STsdb *pRepo, STable *pTable, SReadH* pReadh) { //tsdbInfo("tsdbRestoreLastColumns of table %s", pTable->name->data); STSchema *pSchema = tsdbGetTableLatestSchema(pTable); @@ -811,7 +811,7 @@ out: return err; } -static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { +static int tsdbRestoreLastRow(STsdb *pRepo, STable *pTable, SReadH* pReadh, SBlockIdx *pIdx) { ASSERT(pTable->lastRow == NULL); if (tsdbLoadBlockInfo(pReadh, NULL, NULL) < 0) { return -1; @@ -856,7 +856,7 @@ static int tsdbRestoreLastRow(STsdbRepo *pRepo, STable *pTable, SReadH* pReadh, return 0; } -int tsdbRestoreInfo(STsdbRepo *pRepo) { +int tsdbRestoreInfo(STsdb *pRepo) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; @@ -930,7 +930,7 @@ int tsdbRestoreInfo(STsdbRepo *pRepo) { return 0; } -int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { +int32_t tsdbLoadLastCache(STsdb *pRepo, STable *pTable) { SFSIter fsiter; SReadH readh; SDFileSet *pSet; @@ -1021,7 +1021,7 @@ int32_t tsdbLoadLastCache(STsdbRepo *pRepo, STable *pTable) { return 0; } -UNUSED_FUNC int tsdbCacheLastData(STsdbRepo *pRepo, STsdbCfg* oldCfg) { +UNUSED_FUNC int tsdbCacheLastData(STsdb *pRepo, STsdbCfg* oldCfg) { bool cacheLastRow = false, cacheLastCol = false; SFSIter fsiter; SReadH readh; diff --git a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c index 6923811e25..97c705fc4f 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMemTable.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMemTable.c @@ -15,31 +15,31 @@ #include "tdataformat.h" #include "tfunctional.h" +#include "tsdbRowMergeBuf.h" #include "tsdbint.h" #include "tskiplist.h" -#include "tsdbRowMergeBuf.h" #include "ttime.h" #define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_MAX_INSERT_BATCH 512 -static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); -static void tsdbFreeMemTable(SMemTable *pMemTable); -static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); -static void tsdbFreeTableData(STableData *pTableData); -static char * tsdbGetTsTupleKey(const void *data); -static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); -static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); -static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); -static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); +static SMemTable * tsdbNewMemTable(STsdb *pRepo); +static void tsdbFreeMemTable(SMemTable *pMemTable); +static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); +static void tsdbFreeTableData(STableData *pTableData); +static char * tsdbGetTsTupleKey(const void *data); +static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); +static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); +static int tsdbScanAndConvertSubmitMsg(STsdb *pRepo, SSubmitMsg *pMsg); +static int tsdbInsertDataToTable(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); +static int tsdbCheckTableSchema(STsdb *pRepo, SSubmitBlk *pBlock, STable *pTable); +static int tsdbUpdateTableLatestInfo(STsdb *pRepo, STable *pTable, SMemRow row); -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdb *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now); -int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { - STsdbRepo * pRepo = repo; +int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { + STsdb * pRepo = repo; SSubmitMsgIter msgIter = {0}; SSubmitBlk * pBlock = NULL; int32_t affectedrows = 0, numOfRows = 0; @@ -51,9 +51,9 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR return -1; } - tsdbInitSubmitMsgIter(pMsg, &msgIter); + tInitSubmitMsgIter(pMsg, &msgIter); while (true) { - tsdbGetSubmitMsgNext(&msgIter, &pBlock); + tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) { return -1; @@ -71,37 +71,37 @@ int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pR } // ---------------- INTERNAL FUNCTIONS ---------------- -int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { +int tsdbRefMemTable(STsdb *pRepo, SMemTable *pMemTable) { if (pMemTable == NULL) return 0; int ref = T_REF_INC(pMemTable); - tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref); + tsdbDebug("vgId:%d ref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref); return 0; } // Need to lock the repository -int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { +int tsdbUnRefMemTable(STsdb *pRepo, SMemTable *pMemTable) { if (pMemTable == NULL) return 0; - int ref = T_REF_DEC(pMemTable); - tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref); + int ref = T_REF_DEC(pMemTable); + tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref); if (ref == 0) { STsdbBufPool *pBufPool = pRepo->pPool; SListNode *pNode = NULL; - bool addNew = false; + bool addNew = false; if (tsdbLockRepo(pRepo) < 0) return -1; while ((pNode = tdListPopHead(pMemTable->bufBlockList)) != NULL) { if (pBufPool->nRecycleBlocks > 0) { tsdbRecycleBufferBlock(pBufPool, pNode, false); pBufPool->nRecycleBlocks -= 1; } else { - if(pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) { + if (pBufPool->nElasticBlocks > 0 && listNEles(pBufPool->bufBlockList) > 2) { tsdbRecycleBufferBlock(pBufPool, pNode, true); } else { tdListAppendNode(pBufPool->bufBlockList, pNode); addNew = true; } - } + } } if (addNew) { int code = pthread_cond_signal(&pBufPool->poolNotEmpty); @@ -128,7 +128,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) { return 0; } -int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) { +int tsdbTakeMemSnapshot(STsdb *pRepo, SMemSnapshot *pSnapshot, SArray *pATable) { memset(pSnapshot, 0, sizeof(*pSnapshot)); if (tsdbLockRepo(pRepo) < 0) return -1; @@ -180,7 +180,7 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot, SArray *pATab return 0; } -void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) { +void tsdbUnTakeMemSnapShot(STsdb *pRepo, SMemSnapshot *pSnapshot) { tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pSnapshot->omem, pSnapshot->imem); if (pSnapshot->mem) { @@ -204,7 +204,7 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemSnapshot *pSnapshot) { pSnapshot->omem = NULL; } -void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { +void *tsdbAllocBytes(STsdb *pRepo, int bytes) { STsdbCfg * pCfg = &pRepo->config; STsdbBufBlock *pBufBlock = NULL; void * ptr = NULL; @@ -266,7 +266,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) { return ptr; } -int tsdbSyncCommitConfig(STsdbRepo* pRepo) { +int tsdbSyncCommitConfig(STsdb *pRepo) { ASSERT(pRepo->config_changed == true); tsem_wait(&(pRepo->readyToCommit)); @@ -290,7 +290,7 @@ int tsdbSyncCommitConfig(STsdbRepo* pRepo) { return 0; } -int tsdbAsyncCommit(STsdbRepo *pRepo) { +int tsdbAsyncCommit(STsdb *pRepo) { tsem_wait(&(pRepo->readyToCommit)); ASSERT(pRepo->imem == NULL); @@ -313,8 +313,8 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) { return 0; } -int tsdbSyncCommit(STsdbRepo *repo) { - STsdbRepo *pRepo = repo; +int tsdbSyncCommit(STsdb *repo) { + STsdb *pRepo = repo; tsdbAsyncCommit(pRepo); tsem_wait(&(pRepo->readyToCommit)); @@ -331,13 +331,13 @@ int tsdbSyncCommit(STsdbRepo *repo) { /** * This is an important function to load data or try to load data from memory skiplist iterator. - * + * * This function load memory data until: * 1. iterator ends * 2. data key exceeds maxKey * 3. rowsIncreased = rowsInserted - rowsDeleteSucceed >= maxRowsToRead * 4. operations in pCols not exceeds its max capacity if pCols is given - * + * * The function tries to procceed AS MUCH AS POSSIBLE. */ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols, @@ -453,7 +453,7 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey } // ---------------- LOCAL FUNCTIONS ---------------- -static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { +static SMemTable *tsdbNewMemTable(STsdb *pRepo) { STsdbMeta *pMeta = pRepo->tsdbMeta; SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable)); @@ -479,7 +479,7 @@ static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) { goto _err; } - pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock*)); + pMemTable->bufBlockList = tdListNew(sizeof(STsdbBufBlock *)); if (pMemTable->bufBlockList == NULL) { terrno = TSDB_CODE_TDB_OUT_OF_MEMORY; goto _err; @@ -494,7 +494,7 @@ _err: return NULL; } -static void tsdbFreeMemTable(SMemTable* pMemTable) { +static void tsdbFreeMemTable(SMemTable *pMemTable) { if (pMemTable) { ASSERT((pMemTable->bufBlockList == NULL) ? true : (listNEles(pMemTable->bufBlockList) == 0)); ASSERT((pMemTable->actList == NULL) ? true : (listNEles(pMemTable->actList) == 0)); @@ -520,7 +520,7 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) { pTableData->numOfRows = 0; uint8_t skipListCreateFlags; - if(pCfg->update == TD_ROW_DISCARD_UPDATE) + if (pCfg->update == TD_ROW_DISCARD_UPDATE) skipListCreateFlags = SL_DISCARD_DUP_KEY; else skipListCreateFlags = SL_UPDATE_DUP_KEY; @@ -589,7 +589,7 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema * return 0; } -static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, +static FORCE_INLINE int tsdbCheckRowRange(STsdb *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, TSKEY now) { TSKEY rowKey = memRowKey(row); if (rowKey < minKey || rowKey > maxKey) { @@ -604,7 +604,7 @@ static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMem return 0; } -static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { +static int tsdbScanAndConvertSubmitMsg(STsdb *pRepo, SSubmitMsg *pMsg) { ASSERT(pMsg != NULL); STsdbMeta * pMeta = pRepo->tsdbMeta; SSubmitMsgIter msgIter = {0}; @@ -614,14 +614,14 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { TSKEY now = taosGetTimestamp(pRepo->config.precision); TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep; TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile; - + terrno = TSDB_CODE_SUCCESS; pMsg->length = htonl(pMsg->length); pMsg->numOfBlocks = htonl(pMsg->numOfBlocks); - if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; + if (tInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1; while (true) { - if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; + if (tGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1; if (pBlock == NULL) break; pBlock->uid = htobe64(pBlock->uid); @@ -661,8 +661,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { } } - tsdbInitSubmitBlkIter(pBlock, &blkIter); - while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) { + tInitSubmitBlkIter(pBlock, &blkIter); + while ((row = tGetSubmitBlkNext(&blkIter)) != NULL) { if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) { return -1; } @@ -673,37 +673,34 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) { return 0; } -//row1 has higher priority -static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRepo, - STSchema **ppSchema1, STSchema **ppSchema2, - STable* pTable, int32_t* pPoints, SMemRow* pLastRow) { - - //for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! - if(row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) { +// row1 has higher priority +static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdb *pRepo, STSchema **ppSchema1, + STSchema **ppSchema2, STable *pTable, int32_t *pPoints, SMemRow *pLastRow) { + // for compatiblity, duplicate key inserted when update=0 should be also calculated as affected rows! + if (row1 == NULL && row2 == NULL && pRepo->config.update == TD_ROW_DISCARD_UPDATE) { (*pPoints)++; return NULL; } - tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), - "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), - memRowKey(row1)); + tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo), "updated in", + TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), memRowKey(row1)); - if(row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) { - void* pMem = tsdbAllocBytes(pRepo, memRowTLen(row1)); - if(pMem == NULL) return NULL; + if (row2 == NULL || pRepo->config.update != TD_ROW_PARTIAL_UPDATE) { + void *pMem = tsdbAllocBytes(pRepo, memRowTLen(row1)); + if (pMem == NULL) return NULL; memRowCpy(pMem, row1); (*pPoints)++; *pLastRow = pMem; return pMem; } - STSchema *pSchema1 = *ppSchema1; - STSchema *pSchema2 = *ppSchema2; - SMergeBuf * pBuf = &pRepo->mergeBuf; - int dv1 = memRowVersion(row1); - int dv2 = memRowVersion(row2); - if(pSchema1 == NULL || schemaVersion(pSchema1) != dv1) { - if(pSchema2 != NULL && schemaVersion(pSchema2) == dv1) { + STSchema * pSchema1 = *ppSchema1; + STSchema * pSchema2 = *ppSchema2; + SMergeBuf *pBuf = &pRepo->mergeBuf; + int dv1 = memRowVersion(row1); + int dv2 = memRowVersion(row2); + if (pSchema1 == NULL || schemaVersion(pSchema1) != dv1) { + if (pSchema2 != NULL && schemaVersion(pSchema2) == dv1) { *ppSchema1 = pSchema2; } else { *ppSchema1 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row1), (int8_t)memRowType(row1)); @@ -711,8 +708,8 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep pSchema1 = *ppSchema1; } - if(pSchema2 == NULL || schemaVersion(pSchema2) != dv2) { - if(schemaVersion(pSchema1) == dv2) { + if (pSchema2 == NULL || schemaVersion(pSchema2) != dv2) { + if (schemaVersion(pSchema1) == dv2) { pSchema2 = pSchema1; } else { *ppSchema2 = tsdbGetTableSchemaImpl(pTable, false, false, memRowVersion(row2), (int8_t)memRowType(row2)); @@ -722,8 +719,8 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep SMemRow tmp = tsdbMergeTwoRows(pBuf, row1, row2, pSchema1, pSchema2); - void* pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp)); - if(pMem == NULL) return NULL; + void *pMem = tsdbAllocBytes(pRepo, memRowTLen(tmp)); + if (pMem == NULL) return NULL; memRowCpy(pMem, tmp); (*pPoints)++; @@ -731,13 +728,14 @@ static SMemRow tsdbInsertDupKeyMerge(SMemRow row1, SMemRow row2, STsdbRepo* pRep return pMem; } -static void* tsdbInsertDupKeyMergePacked(void** args) { - return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema**)&args[3], (STSchema**)&args[4], args[5], args[6], args[7]); +static void *tsdbInsertDupKeyMergePacked(void **args) { + return tsdbInsertDupKeyMerge(args[0], args[1], args[2], (STSchema **)&args[3], (STSchema **)&args[4], args[5], + args[6], args[7]); } -static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STable *pTable, int32_t* pPoints, SMemRow* pLastRow) { - - if(pSkipList->insertHandleFn == NULL) { +static void tsdbSetupSkipListHookFns(SSkipList *pSkipList, STsdb *pRepo, STable *pTable, int32_t *pPoints, + SMemRow *pLastRow) { + if (pSkipList->insertHandleFn == NULL) { tGenericSavedFunc *dupHandleSavedFunc = genericSavedFuncInit((GenericVaFunc)&tsdbInsertDupKeyMergePacked, 9); dupHandleSavedFunc->args[2] = pRepo; dupHandleSavedFunc->args[3] = NULL; @@ -749,18 +747,17 @@ static void tsdbSetupSkipListHookFns(SSkipList* pSkipList, STsdbRepo *pRepo, STa pSkipList->insertHandleFn->args[7] = pLastRow; } -static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *pAffectedRows) { +static int tsdbInsertDataToTable(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows) { + STsdbMeta * pMeta = pRepo->tsdbMeta; + int32_t points = 0; + STable * pTable = NULL; + SSubmitBlkIter blkIter = {0}; + SMemTable * pMemTable = NULL; + STableData * pTableData = NULL; + STsdbCfg * pCfg = &(pRepo->config); - STsdbMeta *pMeta = pRepo->tsdbMeta; - int32_t points = 0; - STable *pTable = NULL; - SSubmitBlkIter blkIter = {0}; - SMemTable *pMemTable = NULL; - STableData *pTableData = NULL; - STsdbCfg *pCfg = &(pRepo->config); - - tsdbInitSubmitBlkIter(pBlock, &blkIter); - if(blkIter.row == NULL) return 0; + tInitSubmitBlkIter(pBlock, &blkIter); + if (blkIter.row == NULL) return 0; TSKEY firstRowKey = memRowKey(blkIter.row); tsdbAllocBytes(pRepo, 0); @@ -773,7 +770,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid); - if (TABLE_TID(pTable) >= pMemTable->maxTables) { if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) { return -1; @@ -808,7 +804,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * int64_t dsize = SL_SIZE(pTableData->pData) - osize; (*pAffectedRows) += points; - if(lastRow != NULL) { + if (lastRow != NULL) { TSKEY lastRowKey = memRowKey(lastRow); if (pMemTable->keyFirst > firstRowKey) pMemTable->keyFirst = firstRowKey; pMemTable->numOfRows += dsize; @@ -829,7 +825,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t * return 0; } -static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { +static int tsdbCheckTableSchema(STsdb *pRepo, SSubmitBlk *pBlock, STable *pTable) { ASSERT(pTable != NULL); STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1, -1); @@ -900,12 +896,11 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT return 0; } - -static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow row) { +static void updateTableLatestColumn(STsdb *pRepo, STable *pTable, SMemRow row) { tsdbDebug("vgId:%d updateTableLatestColumn, %s row version:%d", REPO_ID(pRepo), pTable->name->data, memRowVersion(row)); - STSchema* pSchema = tsdbGetTableLatestSchema(pTable); + STSchema *pSchema = tsdbGetTableLatestSchema(pTable); if (tsdbUpdateLastColSchema(pTable, pSchema) < 0) { return; } @@ -916,7 +911,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro } SDataCol *pLatestCols = pTable->lastCols; - int32_t kvIdx = 0; + int32_t kvIdx = 0; for (int16_t j = 0; j < schemaNCols(pSchema); j++) { STColumn *pTCol = schemaColAt(pSchema, j); @@ -935,7 +930,7 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro continue; } // lock - TSDB_WLOCK_TABLE(pTable); + TSDB_WLOCK_TABLE(pTable); SDataCol *pDataCol = &(pLatestCols[idx]); if (pDataCol->pData == NULL) { pDataCol->pData = malloc(pTCol->bytes); @@ -949,14 +944,15 @@ static void updateTableLatestColumn(STsdbRepo *pRepo, STable *pTable, SMemRow ro // the actual data size CANNOT larger than column size assert(pTCol->bytes >= bytes); memcpy(pDataCol->pData, value, bytes); - //tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, (char*)pDataCol->pData); + // tsdbInfo("updateTableLatestColumn vgId:%d cache column %d for %d,%s", REPO_ID(pRepo), j, pDataCol->bytes, + // (char*)pDataCol->pData); pDataCol->ts = memRowKey(row); // unlock - TSDB_WUNLOCK_TABLE(pTable); + TSDB_WUNLOCK_TABLE(pTable); } } -static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row) { +static int tsdbUpdateTableLatestInfo(STsdb *pRepo, STable *pTable, SMemRow row) { STsdbCfg *pCfg = &pRepo->config; // if cacheLastRow config has been reset, free the lastRow diff --git a/source/dnode/vnode/tsdb2/src/tsdbMeta.c b/source/dnode/vnode/tsdb2/src/tsdbMeta.c index 1d6c3bc654..f8fa4f5d8c 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbMeta.c +++ b/source/dnode/vnode/tsdb2/src/tsdbMeta.c @@ -12,6 +12,8 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#if 0 #include "tcompare.h" #include "tsdbint.h" #include "tutil.h" @@ -1690,3 +1692,4 @@ static void tsdbFreeTableSchema(STable *pTable) { taosArrayDestroy(pTable->schema); } } +#endif \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/src/tsdbRead.c b/source/dnode/vnode/tsdb2/src/tsdbRead.c index 5cded628fd..63d4447456 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbRead.c +++ b/source/dnode/vnode/tsdb2/src/tsdbRead.c @@ -13,6 +13,8 @@ * along with this program. If not, see . */ +#if 0 + #include "os.h" #include "tdataformat.h" #include "tskiplist.h" @@ -4574,4 +4576,5 @@ int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle) { return pQueryHandle->srows; } return 0; -} \ No newline at end of file +} +#endif \ No newline at end of file diff --git a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c index 0e23752ec4..58438c8598 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c +++ b/source/dnode/vnode/tsdb2/src/tsdbReadImpl.c @@ -28,7 +28,7 @@ static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlo static int tsdbLoadBlockStatisFromDFile(SReadH *pReadh, SBlock *pBlock); static int tsdbLoadBlockStatisFromAggr(SReadH *pReadh, SBlock *pBlock); -int tsdbInitReadH(SReadH *pReadh, STsdbRepo *pRepo) { +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo) { ASSERT(pReadh != NULL && pRepo != NULL); STsdbCfg *pCfg = REPO_CFG(pRepo); @@ -74,7 +74,7 @@ void tsdbDestroyReadH(SReadH *pReadh) { pReadh->cidx = 0; pReadh->pBlkIdx = NULL; pReadh->pTable = NULL; - pReadh->aBlkIdx = taosArrayDestroy(&pReadh->aBlkIdx); + pReadh->aBlkIdx = taosArrayDestroy(pReadh->aBlkIdx); tsdbCloseDFileSet(TSDB_READ_FSET(pReadh)); pReadh->pRepo = NULL; } @@ -837,7 +837,7 @@ static int tsdbLoadBlockDataColsImpl(SReadH *pReadh, SBlock *pBlock, SDataCols * static int tsdbLoadColData(SReadH *pReadh, SDFile *pDFile, SBlock *pBlock, SBlockCol *pBlockCol, SDataCol *pDataCol) { ASSERT(pDataCol->colId == pBlockCol->colId); - STsdbRepo *pRepo = TSDB_READ_REPO(pReadh); + STsdb *pRepo = TSDB_READ_REPO(pReadh); STsdbCfg * pCfg = REPO_CFG(pRepo); int tsize = pDataCol->bytes * pBlock->numOfRows + COMP_OVERFLOW_BYTES; diff --git a/source/dnode/vnode/tsdb2/src/tsdbSync.c b/source/dnode/vnode/tsdb2/src/tsdbSync.c index 0e01cf37bb..9f63c8515d 100644 --- a/source/dnode/vnode/tsdb2/src/tsdbSync.c +++ b/source/dnode/vnode/tsdb2/src/tsdbSync.c @@ -20,7 +20,7 @@ // Sync handle typedef struct { - STsdbRepo *pRepo; + STsdb *pRepo; SRtn rtn; SOCKET socketFd; void * pBuf; @@ -33,7 +33,7 @@ typedef struct { #define SYNC_BUFFER(sh) ((sh)->pBuf) -static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd); +static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd); static void tsdbDestroySyncH(SSyncH *pSyncH); static int32_t tsdbSyncSendMeta(SSyncH *pSynch); static int32_t tsdbSyncRecvMeta(SSyncH *pSynch); @@ -47,10 +47,10 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2); static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet); static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet); static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch); -static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged); +static int tsdbReload(STsdb *pRepo, bool isMfChanged); int32_t tsdbSyncSend(void *tsdb, SOCKET socketFd) { - STsdbRepo *pRepo = (STsdbRepo *)tsdb; + STsdb *pRepo = (STsdb *)tsdb; SSyncH synch = {0}; tsdbInitSyncH(&synch, pRepo, socketFd); @@ -79,7 +79,7 @@ _err: } int32_t tsdbSyncRecv(void *tsdb, SOCKET socketFd) { - STsdbRepo *pRepo = (STsdbRepo *)tsdb; + STsdb *pRepo = (STsdb *)tsdb; SSyncH synch = {0}; pRepo->state = TSDB_STATE_OK; @@ -114,7 +114,7 @@ _err: return -1; } -static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd) { +static void tsdbInitSyncH(SSyncH *pSyncH, STsdb *pRepo, SOCKET socketFd) { pSyncH->pRepo = pRepo; pSyncH->socketFd = socketFd; tsdbGetRtnSnap(pRepo, &(pSyncH->rtn)); @@ -123,7 +123,7 @@ static void tsdbInitSyncH(SSyncH *pSyncH, STsdbRepo *pRepo, SOCKET socketFd) { static void tsdbDestroySyncH(SSyncH *pSyncH) { taosTZfree(pSyncH->pBuf); } static int32_t tsdbSyncSendMeta(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; bool toSendMeta = false; SMFile mf; @@ -174,7 +174,7 @@ static int32_t tsdbSyncSendMeta(SSyncH *pSynch) { } static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; SMFile * pLMFile = pRepo->fs->cstatus->pmf; // Recv meta info from remote @@ -247,7 +247,7 @@ static int32_t tsdbSyncRecvMeta(SSyncH *pSynch) { } static int32_t tsdbSendMetaInfo(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint32_t tlen = 0; SMFile * pMFile = pRepo->fs->cstatus->pmf; @@ -282,7 +282,7 @@ static int32_t tsdbSendMetaInfo(SSyncH *pSynch) { } static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint32_t tlen = 0; char buf[64] = {0}; @@ -328,7 +328,7 @@ static int32_t tsdbRecvMetaInfo(SSyncH *pSynch) { } static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint8_t decision = toSend; int32_t writeLen = sizeof(uint8_t); @@ -343,7 +343,7 @@ static int32_t tsdbSendDecision(SSyncH *pSynch, bool toSend) { } static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint8_t decision = 0; int32_t readLen = sizeof(uint8_t); @@ -359,7 +359,7 @@ static int32_t tsdbRecvDecision(SSyncH *pSynch, bool *toSend) { } static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; STsdbFS * pfs = REPO_FS(pRepo); SFSIter fsiter; SDFileSet *pSet; @@ -385,7 +385,7 @@ static int32_t tsdbSyncSendDFileSetArray(SSyncH *pSynch) { } static int32_t tsdbSyncRecvDFileSetArray(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; STsdbFS * pfs = REPO_FS(pRepo); SFSIter fsiter; SDFileSet *pLSet; // Local file set @@ -566,7 +566,7 @@ static bool tsdbIsTowFSetSame(SDFileSet *pSet1, SDFileSet *pSet2) { } static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; bool toSend = false; // skip expired fileset @@ -628,7 +628,7 @@ static int32_t tsdbSyncSendDFileSet(SSyncH *pSynch, SDFileSet *pSet) { } static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint32_t tlen = 0; if (pSet) { @@ -660,7 +660,7 @@ static int32_t tsdbSendDFileSetInfo(SSyncH *pSynch, SDFileSet *pSet) { } static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) { - STsdbRepo *pRepo = pSynch->pRepo; + STsdb *pRepo = pSynch->pRepo; uint32_t tlen; char buf[64] = {0}; @@ -703,7 +703,7 @@ static int32_t tsdbRecvDFileSetInfo(SSyncH *pSynch) { return 0; } -static int tsdbReload(STsdbRepo *pRepo, bool isMfChanged) { +static int tsdbReload(STsdb *pRepo, bool isMfChanged) { // TODO: may need to stop and restart stream // if (isMfChanged) { tsdbCloseMeta(pRepo); diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index b7c7408a23..83d9905511 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -335,7 +335,7 @@ int tfsRename(char *orname, char *nrname) { snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname); snprintf(naname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), nrname); - taosRename(oaname, naname); + taosRenameFile(oaname, naname); } }