From a04b6dba7a92f6a149c257a62ba89b981355b83e Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 10 Jan 2022 02:55:36 +0000 Subject: [PATCH] more --- include/dnode/vnode/tsdb2/tsdb.h | 433 ----------------------- include/dnode/vnode/vnode.h | 2 +- source/dnode/mgmt/impl/src/dndVnodes.c | 46 +-- source/dnode/vnode/impl/src/vnodeMain.c | 9 +- source/dnode/vnode/tsdb/src/tsdbCommit.c | 2 +- 5 files changed, 30 insertions(+), 462 deletions(-) delete mode 100644 include/dnode/vnode/tsdb2/tsdb.h diff --git a/include/dnode/vnode/tsdb2/tsdb.h b/include/dnode/vnode/tsdb2/tsdb.h deleted file mode 100644 index 49840ae231..0000000000 --- a/include/dnode/vnode/tsdb2/tsdb.h +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ -#ifndef _TD_TSDB_H_ -#define _TD_TSDB_H_ - -#include -#include -#include - -#include "common.h" -#include "taosdef.h" -#include "tarray.h" -#include "tdataformat.h" -#include "thash.h" -#include "tlist.h" -#include "tlockfree.h" -#include "tmsg.h" -#include "tname.h" - -#ifdef __cplusplus -extern "C" { -#endif - -#define TSDB_VERSION_MAJOR 1 -#define TSDB_VERSION_MINOR 0 - -#define TSDB_INVALID_SUPER_TABLE_ID -1 - -#define TSDB_STATUS_COMMIT_START 1 -#define TSDB_STATUS_COMMIT_OVER 2 -#define TSDB_STATUS_COMMIT_NOBLOCK 3 // commit no block, need to be solved - -// TSDB STATE DEFINITION -#define TSDB_STATE_OK 0x0 -#define TSDB_STATE_BAD_META 0x1 -#define TSDB_STATE_BAD_DATA 0x2 - -typedef struct SDataStatis { - int16_t colId; - int64_t sum; - int64_t max; - int64_t min; - int16_t maxIndex; - int16_t minIndex; - int16_t numOfNull; -} SDataStatis; - -// --------- TSDB APPLICATION HANDLE DEFINITION - -// --------- TSDB REPOSITORY CONFIGURATION DEFINITION -typedef struct { - int32_t tsdbId; - int32_t cacheBlockSize; - int32_t totalBlocks; - int32_t daysPerFile; // day per file sharding policy - int32_t keep; // day of data to keep - int32_t keep1; - int32_t keep2; - int32_t lruCacheSize; - int32_t minRowsPerFileBlock; // minimum rows per file block - int32_t maxRowsPerFileBlock; // maximum rows per file block - int8_t precision; - int8_t compression; - int8_t update; - int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2 -} STsdbCfg; - -#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0) -#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0) -#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0) - -// --------- TSDB REPOSITORY USAGE STATISTICS -typedef struct { - int64_t totalStorage; // total bytes occupie - int64_t compStorage; - int64_t pointsWritten; // total data points written -} STsdbStat; - -typedef struct STsdb STsdb; - -STsdbCfg *tsdbGetCfg(const STsdb *repo); - -// --------- TSDB REPOSITORY DEFINITION -// int32_t tsdbCreateRepo(int repoid); -// int32_t tsdbDropRepo(int repoid); -STsdb * tsdbOpen(STsdbCfg *pCfg, STsdbAppH *pAppH); -int tsdbClose(STsdb *repo, int toCommit); -int32_t tsdbConfigRepo(STsdb *repo, STsdbCfg *pCfg); -int tsdbGetState(STsdb *repo); -int8_t tsdbGetCompactState(STsdb *repo); -// --------- TSDB TABLE DEFINITION -typedef struct { - uint64_t uid; // the unique table ID - int32_t tid; // the table ID in the repository. -} STableId; - -// --------- TSDB TABLE configuration -typedef struct { - ETableType type; - char * name; - STableId tableId; - int32_t sversion; - char * sname; // super table name - uint64_t superUid; - STSchema * schema; - STSchema * tagSchema; - SKVRow tagValues; - char * sql; -} STableCfg; - -void tsdbClearTableCfg(STableCfg *config); - -void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type); -char *tsdbGetTableName(void *pTable); - -#define TSDB_TABLEID(_table) ((STableId *)(_table)) -#define TSDB_PREV_ROW 0x1 -#define TSDB_NEXT_ROW 0x2 - -STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); - -int tsdbCreateTable(STsdb *repo, STableCfg *pCfg); -int tsdbDropTable(STsdb *pRepo, STableId tableId); -int tsdbUpdateTableTagValue(STsdb *repo, SUpdateTableTagValMsg *pMsg); - -uint32_t tsdbGetFileInfo(STsdb *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size); - -// the TSDB repository info -typedef struct STsdbRepoInfo { - STsdbCfg tsdbCfg; - uint64_t version; // version of the repository - int64_t tsdbTotalDataSize; // the original inserted data size - int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository - // TODO: Other informations to add -} STsdbRepoInfo; -STsdbRepoInfo *tsdbGetStatus(STsdb *pRepo); - -// the meter information report structure -typedef struct { - STableCfg tableCfg; - uint64_t version; - int64_t tableTotalDataSize; // In bytes - int64_t tableTotalDiskSize; // In bytes -} STableInfo; - -// -- FOR INSERT DATA -/** - * Insert data to a table in a repository - * @param pRepo the TSDB repository handle - * @param pData the data to insert (will give a more specific description) - * - * @return the number of points inserted, -1 for failure and the error number is set - */ -int32_t tsdbInsertData(STsdb *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp); - -// -- FOR QUERY TIME SERIES DATA - -typedef void *TsdbQueryHandleT; // Use void to hide implementation details - -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -// query condition to build multi-table data block iterator -typedef struct STsdbQueryCond { - STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int64_t offset; // skip offset put down to tsdb - int32_t numOfCols; - SColumnInfo *colList; - bool loadExternalRows; // load external rows or not - int32_t type; // data block load type: -} STsdbQueryCond; - -typedef struct STableData STableData; -typedef struct { - T_REF_DECLARE() - SRWLatch latch; - TSKEY keyFirst; - TSKEY keyLast; - int64_t numOfRows; - int32_t maxTables; - STableData **tData; - SList * actList; - SList * extraBuffList; - SList * bufBlockList; - int64_t pointsAdd; // TODO - int64_t storageAdd; // TODO -} SMemTable; - -typedef struct { - SMemTable *mem; - SMemTable *imem; - SMemTable mtable; - SMemTable *omem; -} SMemSnapshot; - -typedef struct SMemRef { - int32_t ref; - SMemSnapshot snapshot; -} SMemRef; - -#if 0 -typedef struct SFileBlockInfo { - int32_t numBlocksOfStep; -} SFileBlockInfo; - -typedef struct { - void *pTable; - TSKEY lastKey; -} STableKeyInfo; - -typedef struct { - uint32_t numOfTables; - SArray * pGroupList; - SHashObj *map; // speedup acquire the tableQueryInfo by table uid -} STableGroupInfo; - -#define TSDB_BLOCK_DIST_STEP_ROWS 16 -typedef struct { - uint16_t rowSize; - uint16_t numOfFiles; - uint32_t numOfTables; - uint64_t totalSize; - uint64_t totalRows; - int32_t maxRows; - int32_t minRows; - int32_t firstSeekTimeUs; - uint32_t numOfRowsInMemTable; - uint32_t numOfSmallBlocks; - SArray * dataBlockInfos; -} STableBlockDist; - -/** - * Get the data block iterator, starting from position according to the query condition - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfoGroup table object list in the form of set, grouped into different sets according to the - * group by condition - * @param qinfo query info handle from query processor - * @return - */ -TsdbQueryHandleT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, - SMemRef *pRef); - -/** - * Get the last row of the given query time window for all the tables in STableGroupInfo object. - * Note that only one data block with only row will be returned while invoking retrieve data block function for - * all tables in this group. - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfo table list. - * @return - */ -TsdbQueryHandleT tsdbQueryLastRow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, - SMemRef *pRef); - -TsdbQueryHandleT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, - SMemRef *pMemRef); - -bool isTsdbCacheLastRow(TsdbQueryHandleT *pQueryHandle); - -/** - * get the queried table object list - * @param pHandle - * @return - */ -SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle); - -/** - * get the group list according to table id from client - * @param tsdb - * @param pCond - * @param groupList - * @param qinfo - * @return - */ -TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, - uint64_t qId, SMemRef *pRef); - -/** - * get num of rows in mem table - * - * @param pHandle - * @return row size - */ - -int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT *pHandle); - -/** - * move to next block if exists - * - * @param pQueryHandle - * @return - */ -bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle); - -/** - * Get current data block information - * - * @param pQueryHandle - * @param pBlockInfo - * @return - */ -void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo); - -/** - * - * Get the pre-calculated information w.r.t. current data block. - * - * In case of data block in cache, the pBlockStatis will always be NULL. - * If a block is not completed loaded from disk, the pBlockStatis will be NULL. - - * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 - * @return - */ -int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis); - -/** - * - * The query condition with primary timestamp is passed to iterator during its constructor function, - * the returned data block must be satisfied with the time window condition in any cases, - * which means the SData data block is not actually the completed disk data blocks. - * - * @param pQueryHandle query handle - * @param pColumnIdList required data columns id list - * @return - */ -SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList); - -/** - * Get the qualified table id for a super table according to the tag query expression. - * @param stableid. super table sid - * @param pTagCond. tag query condition - */ -int32_t tsdbQuerySTableByTagCond(STsdb *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len, - STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols); - -/** - * destroy the created table group list, which is generated by tag query - * @param pGroupList - */ -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); - -/** - * create the table group result including only one table, used to handle the normal table query - * - * @param tsdb tsdbHandle - * @param uid table uid - * @param pGroupInfo the generated result - * @return - */ -int32_t tsdbGetOneTableGroup(STsdb *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); - -/** - * - * @param tsdb - * @param pTableIdList - * @param pGroupInfo - * @return - */ -int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); - -/** - * clean up the query handle - * @param queryHandle - */ -void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); - -void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond); - -void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo *groupList); - -int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT *queryHandle, STableBlockDist *pTableBlockInfo); - -// obtain queryHandle attribute -int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle); - -/** - * get the statistics of repo usage - * @param repo. point to the tsdbrepo - * @param totalPoints. total data point written - * @param totalStorage. total bytes took by the tsdb - * @param compStorage. total bytes took by the tsdb after compressed - */ -void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); - -int tsdbInitCommitQueue(); -void tsdbDestroyCommitQueue(); -int tsdbSyncCommit(STsdb *repo); -void tsdbIncCommitRef(int vgId); -void tsdbDecCommitRef(int vgId); -void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle); - -// For TSDB file sync -int tsdbSyncSend(void *pRepo, SOCKET socketFd); -int tsdbSyncRecv(void *pRepo, SOCKET socketFd); - -// // For TSDB Compact -// int tsdbCompact(STsdb *pRepo); - -// For TSDB Health Monitor - -// // no problem return true -// bool tsdbNoProblem(STsdb *pRepo); -// // unit of walSize: MB -// int tsdbCheckWal(STsdb *pRepo, uint32_t walSize); - -// // for json tag -// void *getJsonTagValueElment(void *data, char *key, int32_t keyLen, char *out, int16_t bytes); -// void getJsonTagValueAll(void *data, void *dst, int16_t bytes); -// char *parseTagDatatoJson(void *p); -#endif - -#ifdef __cplusplus -} -#endif - -#endif // _TD_TSDB_H_ diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index af56d69b11..a7e5d50ee0 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -87,7 +87,7 @@ void vnodeClear(); * @param pVnodeCfg options of the vnode * @return SVnode* The vnode object */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); /** * @brief Close a VNODE diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 6427ab080a..bd1e08afc0 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -33,14 +33,14 @@ typedef struct { int8_t dropped; int8_t accessState; uint64_t dbUid; - char *db; - char *path; - SVnode *pImpl; + char * db; + char * path; + SVnode * pImpl; STaosQueue *pWriteQ; STaosQueue *pSyncQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; - STaosQueue* pFetchQ; + STaosQueue *pFetchQ; } SVnodeObj; typedef struct { @@ -49,7 +49,7 @@ typedef struct { int32_t failed; int32_t threadIndex; pthread_t thread; - SDnode *pDnode; + SDnode * pDnode; SWrapperCfg *pCfgs; } SVnodeThread; @@ -81,7 +81,7 @@ void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pE void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg); -static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); +static SVnodeObj * dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl); static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode); @@ -94,7 +94,7 @@ static void dndCloseVnodes(SDnode *pDnode); static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj *pVnode = NULL; + SVnodeObj * pVnode = NULL; int32_t refCount = 0; taosRLockLatch(&pMgmt->latch); @@ -127,7 +127,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); + SVnodeObj * pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -197,7 +197,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { dndFreeVnodeWriteQueue(pDnode, pVnode); dndFreeVnodeApplyQueue(pDnode, pVnode); dndFreeVnodeSyncQueue(pDnode, pVnode); - + vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; @@ -217,7 +217,7 @@ static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes) { void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; - SVnodeObj *pVnode = *ppVnode; + SVnodeObj * pVnode = *ppVnode; if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); @@ -239,9 +239,9 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ int32_t code = TSDB_CODE_DND_VNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 30000; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - FILE *fp = NULL; + char * content = calloc(1, maxLen + 1); + cJSON * root = NULL; + FILE * fp = NULL; char file[PATH_MAX + 20] = {0}; SWrapperCfg *pCfgs = NULL; @@ -286,7 +286,7 @@ static int32_t dndGetVnodesFromFile(SDnode *pDnode, SWrapperCfg **ppCfgs, int32_ } for (int32_t i = 0; i < vnodesNum; ++i) { - cJSON *vnode = cJSON_GetArrayItem(vnodes, i); + cJSON * vnode = cJSON_GetArrayItem(vnodes, i); SWrapperCfg *pCfg = &pCfgs[i]; cJSON *vgId = cJSON_GetObjectItem(vnode, "vgId"); @@ -356,7 +356,7 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { int32_t len = 0; int32_t maxLen = 65536; - char *content = calloc(1, maxLen + 1); + char * content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"vnodes\": [\n"); @@ -398,8 +398,8 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { static void *dnodeOpenVnodeFunc(void *param) { SVnodeThread *pThread = param; - SDnode *pDnode = pThread->pDnode; - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SDnode * pDnode = pThread->pDnode; + SVnodesMgmt * pMgmt = &pDnode->vmgmt; dDebug("thread:%d, start to open %d vnodes", pThread->threadIndex, pThread->vnodeNum); setThreadName("open-vnodes"); @@ -412,7 +412,7 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL); + SVnode *pImpl = vnodeOpen(pCfg->path, NULL, pCfg->vgId); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -610,7 +610,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/); + SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/, pCreate->vgId); if (pImpl == NULL) { return -1; } @@ -1016,7 +1016,7 @@ static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { } static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SVnodesMgmt * pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->writePool; pPool->name = "vnode-write"; pPool->max = pDnode->opt.numOfCores; @@ -1056,7 +1056,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { int32_t maxThreads = pDnode->opt.numOfCores / 2; if (maxThreads < 1) maxThreads = 1; - SVnodesMgmt *pMgmt = &pDnode->vmgmt; + SVnodesMgmt * pMgmt = &pDnode->vmgmt; SMWorkerPool *pPool = &pMgmt->syncPool; pPool->name = "vnode-sync"; pPool->max = maxThreads; @@ -1118,12 +1118,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void *pIter = taosHashIterate(pMgmt->hash, NULL); + void * pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj *pVnode = *ppVnode; + SVnodeObj * pVnode = *ppVnode; SVnodeLoad *pLoad = &pLoads->data[v++]; vnodeGetLoad(pVnode->pImpl, pLoad); diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 30962e726e..085e96e168 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -15,12 +15,12 @@ #include "vnodeDef.h" -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg); +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid); static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { SVnode *pVnode = NULL; // Set default options @@ -35,7 +35,7 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { } // Create the handle - pVnode = vnodeNew(path, pVnodeCfg); + pVnode = vnodeNew(path, pVnodeCfg, vid); if (pVnode == NULL) { // TODO: handle error return NULL; @@ -62,7 +62,7 @@ void vnodeClose(SVnode *pVnode) { void vnodeDestroy(const char *path) { taosRemoveDir(path); } /* ------------------------ STATIC METHODS ------------------------ */ -static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { +static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg, int32_t vid) { SVnode *pVnode = NULL; pVnode = (SVnode *)calloc(1, sizeof(*pVnode)); @@ -71,6 +71,7 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) { return NULL; } + pVnode->vgId = vid; pVnode->path = strdup(path); vnodeOptionsCopy(&(pVnode->config), pVnodeCfg); diff --git a/source/dnode/vnode/tsdb/src/tsdbCommit.c b/source/dnode/vnode/tsdb/src/tsdbCommit.c index a147251c5b..c97554f39b 100644 --- a/source/dnode/vnode/tsdb/src/tsdbCommit.c +++ b/source/dnode/vnode/tsdb/src/tsdbCommit.c @@ -351,7 +351,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) { } // Loop to commit each table data - for (int tid = 1; tid < pCommith->niters; tid++) { + for (int tid = 0; tid < pCommith->niters; tid++) { SCommitIter *pIter = pCommith->iters + tid; if (pIter->pTable == NULL) continue;