more code
This commit is contained in:
parent
053df87def
commit
2e8b43d01c
|
@ -167,7 +167,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
|||
pCfg->walCfg.segSize = pCreate->walSegmentSize;
|
||||
pCfg->walCfg.level = pCreate->walLevel;
|
||||
|
||||
pCfg->sstTrigger = pCreate->sstTrigger;
|
||||
pCfg->sttTrigger = pCreate->sstTrigger;
|
||||
pCfg->hashBegin = pCreate->hashBegin;
|
||||
pCfg->hashEnd = pCreate->hashEnd;
|
||||
pCfg->hashMethod = pCreate->hashMethod;
|
||||
|
@ -223,10 +223,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
dInfo("vgId:%d, start to create vnode, tsma:%d standby:%d cacheLast:%d cacheLastSize:%d sstTrigger:%d",
|
||||
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
|
||||
createReq.sstTrigger);
|
||||
createReq.vgId, createReq.isTsma, createReq.standby, createReq.cacheLast, createReq.cacheLastSize,
|
||||
createReq.sstTrigger);
|
||||
dInfo("vgId:%d, hashMethod:%d begin:%u end:%u prefix:%d surfix:%d", createReq.vgId, createReq.hashMethod,
|
||||
createReq.hashBegin, createReq.hashEnd, createReq.hashPrefix, createReq.hashSuffix);
|
||||
createReq.hashBegin, createReq.hashEnd, createReq.hashPrefix, createReq.hashSuffix);
|
||||
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
|
||||
|
||||
if (vmTsmaAdjustDays(&vnodeCfg, &createReq) < 0) {
|
||||
|
|
|
@ -125,6 +125,8 @@ int32_t metaTbCursorNext(SMTbCursor *pTbCur);
|
|||
// typedef struct STsdb STsdb;
|
||||
typedef struct STsdbReader STsdbReader;
|
||||
|
||||
#define TSDB_DEFAULT_STT_FILE 8
|
||||
|
||||
#define TIMEWINDOW_RANGE_CONTAINED 1
|
||||
#define TIMEWINDOW_RANGE_EXTERNAL 2
|
||||
|
||||
|
@ -288,7 +290,7 @@ struct SVnodeCfg {
|
|||
SVnodeStats vndStats;
|
||||
uint32_t hashBegin;
|
||||
uint32_t hashEnd;
|
||||
int16_t sstTrigger;
|
||||
int16_t sttTrigger;
|
||||
int16_t hashPrefix;
|
||||
int16_t hashSuffix;
|
||||
};
|
||||
|
|
|
@ -70,7 +70,6 @@ typedef struct SLDataIter SLDataIter;
|
|||
#define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F)
|
||||
#define TSDB_MAX_SUBBLOCKS 8
|
||||
#define TSDB_MAX_STT_FILE 16
|
||||
#define TSDB_DEFAULT_STT_FILE 8
|
||||
#define TSDB_FHDR_SIZE 512
|
||||
#define TSDB_DEFAULT_PAGE_SIZE 4096
|
||||
|
||||
|
@ -644,35 +643,35 @@ typedef struct {
|
|||
} SRowInfo;
|
||||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockData blockData[2];
|
||||
SArray *aSttBlk;
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
double elapsedTime;
|
||||
SBlockData blockData[2];
|
||||
SArray *aSttBlk;
|
||||
int32_t blockIndex[2]; // to denote the loaded block in the corresponding position.
|
||||
int32_t currentLoadBlockIndex;
|
||||
int32_t loadBlocks;
|
||||
double elapsedTime;
|
||||
} SSttBlockLoadInfo;
|
||||
|
||||
typedef struct SMergeTree {
|
||||
int8_t backward;
|
||||
SRBTree rbt;
|
||||
SArray *pIterList;
|
||||
SLDataIter *pIter;
|
||||
bool destroyLoadInfo;
|
||||
SSttBlockLoadInfo* pLoadInfo;
|
||||
const char *idStr;
|
||||
int8_t backward;
|
||||
SRBTree rbt;
|
||||
SArray *pIterList;
|
||||
SLDataIter *pIter;
|
||||
bool destroyLoadInfo;
|
||||
SSttBlockLoadInfo *pLoadInfo;
|
||||
const char *idStr;
|
||||
} SMergeTree;
|
||||
|
||||
int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid,
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void* pLoadInfo, const char* idStr);
|
||||
STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr);
|
||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||
void tMergeTreeClose(SMergeTree *pMTree);
|
||||
|
||||
SSttBlockLoadInfo* tCreateLastBlockLoadInfo();
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
|
||||
void getLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo, int64_t* blocks, double* el);
|
||||
void* destroyLastBlockLoadInfo(SSttBlockLoadInfo* pLoadInfo);
|
||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo();
|
||||
void resetLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
void getLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo, int64_t *blocks, double *el);
|
||||
void *destroyLastBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
|
||||
|
||||
// ========== inline functions ==========
|
||||
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||
|
|
|
@ -50,7 +50,7 @@ typedef struct {
|
|||
int32_t minRow;
|
||||
int32_t maxRow;
|
||||
int8_t cmprAlg;
|
||||
int8_t maxLast;
|
||||
int8_t sttTrigger;
|
||||
SArray *aTbDataP; // memory
|
||||
STsdbFS fs; // disk
|
||||
// --------------
|
||||
|
@ -428,7 +428,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) {
|
|||
pCommitter->toLastOnly = 0;
|
||||
SDataFReader *pReader = pCommitter->dReader.pReader;
|
||||
if (pReader) {
|
||||
if (pReader->pSet->nSttF >= pCommitter->maxLast) {
|
||||
if (pReader->pSet->nSttF >= pCommitter->sttTrigger) {
|
||||
int8_t iIter = 0;
|
||||
for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) {
|
||||
pIter = &pCommitter->aDataIter[iIter];
|
||||
|
@ -515,11 +515,11 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
|||
SSttFile fStt = {.commitID = pCommitter->commitID};
|
||||
SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma};
|
||||
if (pRSet) {
|
||||
ASSERT(pRSet->nSttF <= pCommitter->maxLast);
|
||||
ASSERT(pRSet->nSttF <= pCommitter->sttTrigger);
|
||||
fData = *pRSet->pDataF;
|
||||
fSma = *pRSet->pSmaF;
|
||||
wSet.diskId = pRSet->diskId;
|
||||
if (pRSet->nSttF < pCommitter->maxLast) {
|
||||
if (pRSet->nSttF < pCommitter->sttTrigger) {
|
||||
for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) {
|
||||
wSet.aSttF[iStt] = pRSet->aSttF[iStt];
|
||||
}
|
||||
|
@ -757,7 +757,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) {
|
|||
pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows;
|
||||
pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows;
|
||||
pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression;
|
||||
pCommitter->maxLast = TSDB_DEFAULT_STT_FILE; // TODO: make it as a config
|
||||
pCommitter->sttTrigger = pTsdb->pVnode->config.sttTrigger;
|
||||
pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem);
|
||||
if (pCommitter->aTbDataP == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -13,8 +13,8 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "vnd.h"
|
||||
#include "tutil.h"
|
||||
#include "vnd.h"
|
||||
|
||||
const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
||||
.dbname = "",
|
||||
|
@ -48,7 +48,8 @@ const SVnodeCfg vnodeCfgDefault = {.vgId = -1,
|
|||
},
|
||||
.hashBegin = 0,
|
||||
.hashEnd = 0,
|
||||
.hashMethod = 0};
|
||||
.hashMethod = 0,
|
||||
.sttTrigger = TSDB_DEFAULT_STT_FILE};
|
||||
|
||||
int vnodeCheckCfg(const SVnodeCfg *pCfg) {
|
||||
// TODO
|
||||
|
@ -107,7 +108,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
|||
if (tjsonAddIntegerToObject(pJson, "wal.retentionSize", pCfg->walCfg.retentionSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.segSize", pCfg->walCfg.segSize) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "wal.level", pCfg->walCfg.level) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sstTrigger) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "sstTrigger", pCfg->sttTrigger) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashBegin", pCfg->hashBegin) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||
|
@ -209,7 +210,7 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
|||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "wal.level", pCfg->walCfg.level, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sstTrigger, code);
|
||||
tjsonGetNumberValue(pJson, "sstTrigger", pCfg->sttTrigger, code);
|
||||
if (code < 0) return -1;
|
||||
tjsonGetNumberValue(pJson, "hashBegin", pCfg->hashBegin, code);
|
||||
if (code < 0) return -1;
|
||||
|
|
|
@ -60,6 +60,8 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
|||
|
||||
snprintf(dir, TSDB_FILENAME_LEN, "%s%s%s", tfsGetPrimaryPath(pTfs), TD_DIRSEP, path);
|
||||
|
||||
info.config = vnodeCfgDefault;
|
||||
|
||||
// load vnode info
|
||||
ret = vnodeLoadInfo(dir, &info);
|
||||
if (ret < 0) {
|
||||
|
|
Loading…
Reference in New Issue