feat: show vgroup display num of cache tables
This commit is contained in:
parent
62377619ba
commit
ca39dfcf2d
|
@ -98,7 +98,8 @@ extern char *tsSvrCrashReportUri;
|
||||||
|
|
||||||
// query buffer management
|
// query buffer management
|
||||||
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
|
||||||
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
|
||||||
|
extern int32_t tsCacheLazyLoadThreshold; // cost threshold for last/last_row loading cache as much as possible
|
||||||
|
|
||||||
// query client
|
// query client
|
||||||
extern int32_t tsQueryPolicy;
|
extern int32_t tsQueryPolicy;
|
||||||
|
@ -145,10 +146,10 @@ extern char tsUdfdResFuncs[];
|
||||||
extern char tsUdfdLdLibPath[];
|
extern char tsUdfdLdLibPath[];
|
||||||
|
|
||||||
// schemaless
|
// schemaless
|
||||||
extern char tsSmlChildTableName[];
|
extern char tsSmlChildTableName[];
|
||||||
extern char tsSmlTagName[];
|
extern char tsSmlTagName[];
|
||||||
//extern bool tsSmlDataFormat;
|
// extern bool tsSmlDataFormat;
|
||||||
//extern int32_t tsSmlBatchSize;
|
// extern int32_t tsSmlBatchSize;
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
extern int64_t tsWalFsyncDataSizeLimit;
|
extern int64_t tsWalFsyncDataSizeLimit;
|
||||||
|
|
|
@ -1136,7 +1136,7 @@ typedef struct {
|
||||||
int64_t numOfInsertSuccessReqs;
|
int64_t numOfInsertSuccessReqs;
|
||||||
int64_t numOfBatchInsertReqs;
|
int64_t numOfBatchInsertReqs;
|
||||||
int64_t numOfBatchInsertSuccessReqs;
|
int64_t numOfBatchInsertSuccessReqs;
|
||||||
int32_t numOfCacheTables;
|
int32_t numOfCachedTables;
|
||||||
} SVnodeLoad;
|
} SVnodeLoad;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -233,6 +233,7 @@ static const SSysDbTableSchema vgroupsSchema[] = {
|
||||||
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
{.name = "v4_dnode", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true},
|
||||||
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
{.name = "v4_status", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = true},
|
||||||
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
{.name = "cacheload", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
|
{.name = "cacheTables", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true},
|
||||||
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
{.name = "tsma", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT, .sysInfo = true},
|
||||||
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
// {.name = "compact_start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false},
|
||||||
};
|
};
|
||||||
|
|
|
@ -154,6 +154,7 @@ char tsTagFilterCache = 0;
|
||||||
// positive value (in MB)
|
// positive value (in MB)
|
||||||
int32_t tsQueryBufferSize = -1;
|
int32_t tsQueryBufferSize = -1;
|
||||||
int64_t tsQueryBufferSizeBytes = -1;
|
int64_t tsQueryBufferSizeBytes = -1;
|
||||||
|
int32_t tsCacheLazyLoadThreshold = 500;
|
||||||
|
|
||||||
int32_t tsDiskCfgNum = 0;
|
int32_t tsDiskCfgNum = 0;
|
||||||
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
SDiskCfg tsDiskCfg[TFS_MAX_DISKS] = {0};
|
||||||
|
@ -497,6 +498,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
|
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1;
|
||||||
|
|
||||||
|
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1;
|
||||||
|
|
||||||
GRANT_CFG_ADD;
|
GRANT_CFG_ADD;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -824,6 +827,8 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
||||||
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
|
||||||
|
|
||||||
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval;
|
||||||
|
|
||||||
GRANT_CFG_GET;
|
GRANT_CFG_GET;
|
||||||
|
|
|
@ -1070,7 +1070,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->totalStorage) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->compStorage) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
if (tEncodeI64(&encoder, pload->pointsWritten) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI32(&encoder, pload->numOfCachedTables) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, reserved) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||||
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
if (tEncodeI64(&encoder, reserved) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
@ -1148,7 +1149,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.totalStorage) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, (int32_t*)&reserved) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||||
|
|
|
@ -359,6 +359,7 @@ typedef struct {
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
|
||||||
void* pTsma;
|
void* pTsma;
|
||||||
|
int32_t numOfCachedTables;
|
||||||
} SVgObj;
|
} SVgObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -412,6 +412,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (pVgroup != NULL) {
|
if (pVgroup != NULL) {
|
||||||
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
|
if (pVload->syncState == TAOS_SYNC_STATE_LEADER) {
|
||||||
pVgroup->cacheUsage = pVload->cacheUsage;
|
pVgroup->cacheUsage = pVload->cacheUsage;
|
||||||
|
pVgroup->numOfCachedTables = pVload->numOfCachedTables;
|
||||||
pVgroup->numOfTables = pVload->numOfTables;
|
pVgroup->numOfTables = pVload->numOfTables;
|
||||||
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
|
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
|
||||||
pVgroup->totalStorage = pVload->totalStorage;
|
pVgroup->totalStorage = pVload->totalStorage;
|
||||||
|
@ -440,7 +441,8 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (roleChanged) {
|
if (roleChanged) {
|
||||||
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
SDbObj *pDb = mndAcquireDb(pMnode, pVgroup->dbName);
|
||||||
if (pDb != NULL && pDb->stateTs != curMs) {
|
if (pDb != NULL && pDb->stateTs != curMs) {
|
||||||
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name, pDb->stateTs, curMs);
|
mInfo("db:%s, stateTs changed by status msg, old stateTs:%" PRId64 " new stateTs:%" PRId64, pDb->name,
|
||||||
|
pDb->stateTs, curMs);
|
||||||
pDb->stateTs = curMs;
|
pDb->stateTs = curMs;
|
||||||
}
|
}
|
||||||
mndReleaseDb(pMnode, pDb);
|
mndReleaseDb(pMnode, pDb);
|
||||||
|
|
|
@ -803,6 +803,9 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
||||||
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
|
int32_t cacheUsage = (int32_t)pVgroup->cacheUsage;
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&cacheUsage, false);
|
||||||
|
|
||||||
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->numOfCachedTables, false);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
|
colDataSetVal(pColInfo, numOfRows, (const char *)&pVgroup->isTsma, false);
|
||||||
|
|
||||||
|
|
|
@ -706,6 +706,7 @@ typedef struct SMergeTree {
|
||||||
bool destroyLoadInfo;
|
bool destroyLoadInfo;
|
||||||
SSttBlockLoadInfo *pLoadInfo;
|
SSttBlockLoadInfo *pLoadInfo;
|
||||||
const char *idStr;
|
const char *idStr;
|
||||||
|
bool ignoreEarlierTs;
|
||||||
} SMergeTree;
|
} SMergeTree;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -751,6 +752,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
|
bool destroyLoadInfo, const char *idStr, bool strictTimeRange);
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree);
|
bool tMergeTreeNext(SMergeTree *pMTree);
|
||||||
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
|
||||||
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
TSDBROW tMergeTreeGetRow(SMergeTree *pMTree);
|
||||||
void tMergeTreeClose(SMergeTree *pMTree);
|
void tMergeTreeClose(SMergeTree *pMTree);
|
||||||
|
|
||||||
|
|
|
@ -637,6 +637,11 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow, bool *pIgnoreEa
|
||||||
state->pMergeTree = &state->mergeTree;
|
state->pMergeTree = &state->mergeTree;
|
||||||
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
bool hasVal = tMergeTreeNext(&state->mergeTree);
|
||||||
if (!hasVal) {
|
if (!hasVal) {
|
||||||
|
if (tMergeTreeIgnoreEarlierTs(&state->mergeTree)) {
|
||||||
|
*pIgnoreEarlierTs = true;
|
||||||
|
*ppRow = NULL;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
state->state = SFSLASTNEXTROW_FILESET;
|
state->state = SFSLASTNEXTROW_FILESET;
|
||||||
goto _next_fileset;
|
goto _next_fileset;
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,6 +332,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
// retrieve the only one last row of all tables in the uid list.
|
// retrieve the only one last row of all tables in the uid list.
|
||||||
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_SINGLE)) {
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
for (int32_t i = 0; i < pr->numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||||
|
|
||||||
|
@ -407,8 +408,10 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
if (hasNotNullRow) {
|
if (hasNotNullRow) {
|
||||||
pr->lastTs = minTs;
|
double cost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
tsdbInfo("%p have cache read %d tables", pr, i + 1);
|
if (cost > tsCacheLazyLoadThreshold) {
|
||||||
|
pr->lastTs = minTs;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -418,8 +421,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
if (hasRes) {
|
if (hasRes) {
|
||||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes, pr->idstr);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbInfo("have cached %d tables", taosLRUCacheGetElems(lruCache));
|
|
||||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||||
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
STableKeyInfo* pKeyInfo = &pr->pTableList[i];
|
||||||
|
|
|
@ -29,6 +29,7 @@ struct SLDataIter {
|
||||||
STimeWindow timeWindow;
|
STimeWindow timeWindow;
|
||||||
SVersionRange verRange;
|
SVersionRange verRange;
|
||||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||||
|
bool ignoreEarlierTs;
|
||||||
};
|
};
|
||||||
|
|
||||||
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
|
SSttBlockLoadInfo *tCreateLastBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols,
|
||||||
|
@ -351,6 +352,7 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t
|
||||||
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
|
if (backward && ((strictTimeRange && (*pIter)->pSttBlk->maxKey <= (*pIter)->timeWindow.skey) ||
|
||||||
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
|
(!strictTimeRange && (*pIter)->pSttBlk->maxKey < (*pIter)->timeWindow.skey))) {
|
||||||
(*pIter)->pSttBlk = NULL;
|
(*pIter)->pSttBlk = NULL;
|
||||||
|
(*pIter)->ignoreEarlierTs = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -581,6 +583,7 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
|
|
||||||
pMTree->pLoadInfo = pBlockLoadInfo;
|
pMTree->pLoadInfo = pBlockLoadInfo;
|
||||||
pMTree->destroyLoadInfo = destroyLoadInfo;
|
pMTree->destroyLoadInfo = destroyLoadInfo;
|
||||||
|
pMTree->ignoreEarlierTs = false;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file
|
||||||
struct SLDataIter *pIter = NULL;
|
struct SLDataIter *pIter = NULL;
|
||||||
|
@ -595,6 +598,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead
|
||||||
taosArrayPush(pMTree->pIterList, &pIter);
|
taosArrayPush(pMTree->pIterList, &pIter);
|
||||||
tMergeTreeAddIter(pMTree, pIter);
|
tMergeTreeAddIter(pMTree, pIter);
|
||||||
} else {
|
} else {
|
||||||
|
if (!pMTree->ignoreEarlierTs) {
|
||||||
|
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
||||||
|
}
|
||||||
tLDataIterClose(pIter);
|
tLDataIterClose(pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -608,6 +614,8 @@ _end:
|
||||||
|
|
||||||
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter) { tRBTreePut(&pMTree->rbt, (SRBTreeNode *)pIter); }
|
||||||
|
|
||||||
|
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree) { return pMTree->ignoreEarlierTs; }
|
||||||
|
|
||||||
bool tMergeTreeNext(SMergeTree *pMTree) {
|
bool tMergeTreeNext(SMergeTree *pMTree) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (pMTree->pIter) {
|
if (pMTree->pIter) {
|
||||||
|
|
|
@ -382,7 +382,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||||
pLoad->syncRestore = state.restored;
|
pLoad->syncRestore = state.restored;
|
||||||
pLoad->syncCanRead = state.canRead;
|
pLoad->syncCanRead = state.canRead;
|
||||||
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
pLoad->cacheUsage = tsdbCacheGetUsage(pVnode);
|
||||||
pLoad->numOfCacheTables = tsdbCacheGetElems(pVnode);
|
pLoad->numOfCachedTables = tsdbCacheGetElems(pVnode);
|
||||||
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
pLoad->numOfTables = metaGetTbNum(pVnode->pMeta);
|
||||||
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
pLoad->numOfTimeSeries = metaGetTimeSeriesNum(pVnode->pMeta);
|
||||||
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
pLoad->totalStorage = (int64_t)3 * 1073741824;
|
||||||
|
|
Loading…
Reference in New Issue