fix(query): fix ins_usage error on multi storage (#30134)
* fix calc disk usage error * fix calc disk usage error * fix calc disk usage error * fix calc disk usage error * fix calc disk usage error * add test case * add test case * add test case * add test case * add test case * add test case * refactor code * refactor code * refactor code * refactor code * refactor code * refactor code
This commit is contained in:
parent
9ec153e145
commit
0806cca09e
|
@ -960,7 +960,7 @@ int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
|||
int32_t tsdbCacheDeleteLast(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
int32_t tsdbCacheDelete(SLRUCache *pCache, tb_uid_t uid, TSKEY eKey);
|
||||
|
||||
int32_t tsdbGetS3Size(STsdb *tsdb, int64_t *size);
|
||||
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo);
|
||||
|
||||
// ========== inline functions ==========
|
||||
static FORCE_INLINE int32_t tsdbKeyCmprFn(const void *p1, const void *p2) {
|
||||
|
|
|
@ -1413,4 +1413,62 @@ void tsdbFileSetReaderClose(struct SFileSetReader **ppReader) {
|
|||
|
||||
*ppReader = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void getLevelSize(const STFileObj *fObj, int64_t szArr[TFS_MAX_TIERS]) {
|
||||
if (fObj == NULL) return;
|
||||
|
||||
int64_t sz = fObj->f->size;
|
||||
// level == 0, primary storage
|
||||
// level == 1, second storage,
|
||||
// level == 2, third storage
|
||||
int32_t level = fObj->f->did.level;
|
||||
if (level >= 0 && level < TFS_MAX_TIERS) {
|
||||
szArr[level] += sz;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tsdbGetFsSizeImpl(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
int64_t levelSize[TFS_MAX_TIERS] = {0};
|
||||
int64_t s3Size = 0;
|
||||
|
||||
const STFileSet *fset;
|
||||
const SSttLvl *stt = NULL;
|
||||
const STFileObj *fObj = NULL;
|
||||
|
||||
SVnodeCfg *pCfg = &tsdb->pVnode->config;
|
||||
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
||||
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
for (int32_t t = TSDB_FTYPE_MIN; t < TSDB_FTYPE_MAX; ++t) {
|
||||
getLevelSize(fset->farr[t], levelSize);
|
||||
}
|
||||
|
||||
TARRAY2_FOREACH(fset->lvlArr, stt) {
|
||||
TARRAY2_FOREACH(stt->fobjArr, fObj) { getLevelSize(fObj, levelSize); }
|
||||
}
|
||||
|
||||
fObj = fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fObj) {
|
||||
int32_t lcn = fObj->f->lcn;
|
||||
if (lcn > 1) {
|
||||
s3Size += ((lcn - 1) * chunksize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->l1Size = levelSize[0];
|
||||
pInfo->l2Size = levelSize[1];
|
||||
pInfo->l3Size = levelSize[2];
|
||||
pInfo->s3Size = s3Size;
|
||||
return code;
|
||||
}
|
||||
int32_t tsdbGetFsSize(STsdb *tsdb, SDbSizeStatisInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbGetFsSizeImpl(tsdb, pInfo);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
|
@ -747,29 +747,4 @@ int32_t tsdbAsyncS3Migrate(STsdb *tsdb, int64_t now) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbGetS3SizeImpl(STsdb *tsdb, int64_t *size) {
|
||||
int32_t code = 0;
|
||||
|
||||
SVnodeCfg *pCfg = &tsdb->pVnode->config;
|
||||
int64_t chunksize = (int64_t)pCfg->tsdbPageSize * pCfg->s3ChunkSize;
|
||||
|
||||
STFileSet *fset;
|
||||
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
|
||||
STFileObj *fobj = fset->farr[TSDB_FTYPE_DATA];
|
||||
if (fobj) {
|
||||
int32_t lcn = fobj->f->lcn;
|
||||
if (lcn > 1) {
|
||||
*size += ((lcn - 1) * chunksize);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
int32_t tsdbGetS3Size(STsdb *tsdb, int64_t *size) {
|
||||
int32_t code = 0;
|
||||
(void)taosThreadMutexLock(&tsdb->mutex);
|
||||
code = tsdbGetS3SizeImpl(tsdb, size);
|
||||
(void)taosThreadMutexUnlock(&tsdb->mutex);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
*/
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tutil.h"
|
||||
#include "vnd.h"
|
||||
|
||||
#define VNODE_GET_LOAD_RESET_VALS(pVar, oVal, vType, tags) \
|
||||
|
@ -49,7 +50,7 @@ int32_t fillTableColCmpr(SMetaReader *reader, SSchemaExt *pExt, int32_t numOfCol
|
|||
return 0;
|
||||
}
|
||||
|
||||
void vnodePrintTableMeta(STableMetaRsp* pMeta) {
|
||||
void vnodePrintTableMeta(STableMetaRsp *pMeta) {
|
||||
if (!(qDebugFlag & DEBUG_DEBUG)) {
|
||||
return;
|
||||
}
|
||||
|
@ -70,14 +71,13 @@ void vnodePrintTableMeta(STableMetaRsp* pMeta) {
|
|||
qDebug("sysInfo:%d", pMeta->sysInfo);
|
||||
if (pMeta->pSchemas) {
|
||||
for (int32_t i = 0; i < (pMeta->numOfColumns + pMeta->numOfTags); ++i) {
|
||||
SSchema* pSchema = pMeta->pSchemas + i;
|
||||
qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags, pSchema->colId, pSchema->bytes, pSchema->name);
|
||||
SSchema *pSchema = pMeta->pSchemas + i;
|
||||
qDebug("%d col/tag: type:%d, flags:%d, colId:%d, bytes:%d, name:%s", i, pSchema->type, pSchema->flags,
|
||||
pSchema->colId, pSchema->bytes, pSchema->name);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
int32_t vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
||||
STableInfoReq infoReq = {0};
|
||||
STableMetaRsp metaRsp = {0};
|
||||
|
@ -907,18 +907,14 @@ int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64
|
|||
return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
|
||||
}
|
||||
|
||||
int32_t vnodeGetDBSize(void *pVnode, SDbSizeStatisInfo *pInfo) {
|
||||
SVnode *pVnodeObj = pVnode;
|
||||
if (pVnodeObj == NULL) {
|
||||
return TSDB_CODE_VND_NOT_EXIST;
|
||||
}
|
||||
static FORCE_INLINE int32_t vnodeGetDBPrimaryInfo(SVnode *pVnode, SDbSizeStatisInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
|
||||
char *dirName[] = {VNODE_TSDB_DIR, VNODE_WAL_DIR, VNODE_META_DIR, VNODE_TSDB_CACHE_DIR};
|
||||
int64_t dirSize[4];
|
||||
|
||||
vnodeGetPrimaryDir(pVnodeObj->path, pVnodeObj->diskPrimary, pVnodeObj->pTfs, path, TSDB_FILENAME_LEN);
|
||||
vnodeGetPrimaryDir(pVnode->path, pVnode->diskPrimary, pVnode->pTfs, path, TSDB_FILENAME_LEN);
|
||||
int32_t offset = strlen(path);
|
||||
|
||||
for (int i = 0; i < sizeof(dirName) / sizeof(dirName[0]); i++) {
|
||||
|
@ -932,13 +928,24 @@ int32_t vnodeGetDBSize(void *pVnode, SDbSizeStatisInfo *pInfo) {
|
|||
dirSize[i] = size;
|
||||
}
|
||||
|
||||
pInfo->l1Size = dirSize[0] - dirSize[3];
|
||||
pInfo->l1Size = 0;
|
||||
pInfo->walSize = dirSize[1];
|
||||
pInfo->metaSize = dirSize[2];
|
||||
pInfo->cacheSize = dirSize[3];
|
||||
return code;
|
||||
}
|
||||
int32_t vnodeGetDBSize(void *pVnode, SDbSizeStatisInfo *pInfo) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SVnode *pVnodeObj = pVnode;
|
||||
if (pVnodeObj == NULL) {
|
||||
return TSDB_CODE_VND_NOT_EXIST;
|
||||
}
|
||||
code = vnodeGetDBPrimaryInfo(pVnode, pInfo);
|
||||
if (code != 0) goto _exit;
|
||||
|
||||
code = tsdbGetS3Size(pVnodeObj->pTsdb, &pInfo->s3Size);
|
||||
|
||||
code = tsdbGetFsSize(pVnodeObj->pTsdb, pInfo);
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -531,6 +531,8 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/view/non_marterial_view/test_view.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_show_table_distributed.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/test_show_disk_usage.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show_disk_usage_multilevel.py
|
||||
|
||||
,,n,system-test,python3 ./test.py -f 0-others/compatibility.py
|
||||
,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py
|
||||
,,n,system-test,python3 ./test.py -f 0-others/udfpy_main.py
|
||||
|
|
|
@ -0,0 +1,220 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
import os
|
||||
import time
|
||||
from util.log import *
|
||||
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
import subprocess
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
def get_disk_usage(path):
|
||||
try:
|
||||
result = subprocess.run(['du', '-sb', path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
|
||||
if result.returncode == 0:
|
||||
# The output is in the format "size\tpath"
|
||||
size = int(result.stdout.split()[0])
|
||||
return size
|
||||
else:
|
||||
print(f"Error: {result.stderr}")
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f"Exception occurred: {e}")
|
||||
return None
|
||||
|
||||
class TDTestCase:
|
||||
def _prepare_env1(self):
|
||||
tdLog.info("============== prepare environment 1 ===============")
|
||||
|
||||
level_0_path = f'{self.dnode_path}/data00'
|
||||
cfg = {
|
||||
level_0_path: 'dataDir',
|
||||
}
|
||||
tdSql.createDir(level_0_path)
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.deploy(1, cfg)
|
||||
tdDnodes.start(1)
|
||||
|
||||
def _prepare_env2(self):
|
||||
tdLog.info("============== prepare environment 2 ===============")
|
||||
|
||||
level_0_path = f'{self.dnode_path}/data00'
|
||||
level_1_path = f'{self.dnode_path}/data01'
|
||||
cfg = {
|
||||
f'{level_0_path}': 'dataDir',
|
||||
f'{level_1_path} 1 0': 'dataDir',
|
||||
}
|
||||
tdSql.createDir(level_1_path)
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.deploy(1, cfg)
|
||||
tdDnodes.start(1)
|
||||
|
||||
def _write_bulk_data(self):
|
||||
tdLog.info("============== write bulk data ===============")
|
||||
json_content = f"""
|
||||
{{
|
||||
"filetype": "insert",
|
||||
"cfgdir": "{self.cfg_path}",
|
||||
"host": "localhost",
|
||||
"port": 6030,
|
||||
"user": "root",
|
||||
"password": "taosdata",
|
||||
"connection_pool_size": 8,
|
||||
"thread_count": 16,
|
||||
"create_table_thread_count": 10,
|
||||
"result_file": "./insert_res.txt",
|
||||
"confirm_parameter_prompt": "no",
|
||||
"insert_interval": 0,
|
||||
"interlace_rows": 5,
|
||||
"num_of_records_per_req": 1540,
|
||||
"prepared_rand": 10000,
|
||||
"chinese": "no",
|
||||
"databases": [
|
||||
{{
|
||||
"dbinfo": {{
|
||||
"name": "{self.db_name}",
|
||||
"drop": "yes",
|
||||
"vgroups": {self.vgroups},
|
||||
"duration": "1d",
|
||||
"keep": "3d,6d",
|
||||
"wal_retention_period": 0,
|
||||
"stt_trigger": 1
|
||||
}},
|
||||
"super_tables": [
|
||||
{{
|
||||
"name": "stb",
|
||||
"child_table_exists": "no",
|
||||
"childtable_count": 1000,
|
||||
"childtable_prefix": "ctb",
|
||||
"escape_character": "yes",
|
||||
"auto_create_table": "no",
|
||||
"batch_create_tbl_num": 500,
|
||||
"data_source": "rand",
|
||||
"insert_mode": "taosc",
|
||||
"non_stop_mode": "no",
|
||||
"line_protocol": "line",
|
||||
"insert_rows": 10000,
|
||||
"childtable_limit": 10,
|
||||
"childtable_offset": 100,
|
||||
"interlace_rows": 0,
|
||||
"insert_interval": 0,
|
||||
"partial_col_num": 0,
|
||||
"disorder_ratio": 0,
|
||||
"disorder_range": 1000,
|
||||
"timestamp_step": 40000,
|
||||
"start_timestamp": "{(datetime.now() - timedelta(days=5)).strftime('%Y-%m-%d %H:%M:%S')}",
|
||||
"use_sample_ts": "no",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{{
|
||||
"type": "bigint",
|
||||
"count": 10
|
||||
}}
|
||||
],
|
||||
"tags": [
|
||||
{{
|
||||
"type": "TINYINT",
|
||||
"name": "groupid",
|
||||
"max": 10,
|
||||
"min": 1
|
||||
}},
|
||||
{{
|
||||
"name": "location",
|
||||
"type": "BINARY",
|
||||
"len": 16,
|
||||
"values": [
|
||||
"beijing",
|
||||
"shanghai"
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}
|
||||
]
|
||||
}}
|
||||
"""
|
||||
json_file = '/tmp/test.json'
|
||||
with open(json_file, 'w') as f:
|
||||
f.write(json_content)
|
||||
# Use subprocess.run() to wait for the command to finish
|
||||
subprocess.run(f'taosBenchmark -f {json_file}', shell=True, check=True)
|
||||
|
||||
def _check_retention(self):
|
||||
for vgid in range(2, 2+self.vgroups):
|
||||
tsdb_path = self.dnode_path+f'/data01/vnode/vnode{vgid}/tsdb'
|
||||
# check the path should not be empty
|
||||
if not os.listdir(tsdb_path):
|
||||
tdLog.error(f'{tsdb_path} is empty')
|
||||
assert False
|
||||
def _calculate_disk_usage(self, path):
|
||||
size = 0
|
||||
for vgid in range(2, 2+self.vgroups):
|
||||
tsdb_path = self.dnode_path+f'/{path}/vnode/vnode{vgid}/tsdb'
|
||||
size += get_disk_usage(tsdb_path)
|
||||
return int(size/1024)
|
||||
|
||||
def _value_check(self, size1, size2, threshold=1000):
|
||||
if abs(size1 - size2) < threshold:
|
||||
tdLog.info(f"checkEqual success, base_value={size1},check_value={size2}")
|
||||
else :
|
||||
tdLog.exit(f"checkEqual error, base_value=={size1},check_value={size2}")
|
||||
|
||||
|
||||
def run(self):
|
||||
self._prepare_env1()
|
||||
self._write_bulk_data()
|
||||
tdSql.execute(f'flush database {self.db_name}')
|
||||
tdDnodes.stop(1)
|
||||
|
||||
self._prepare_env2()
|
||||
tdSql.execute(f'trim database {self.db_name}')
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
self._check_retention()
|
||||
|
||||
size1 = self._calculate_disk_usage('data00')
|
||||
size2 = self._calculate_disk_usage('data01')
|
||||
|
||||
tdSql.query(f'select sum(data1), sum(data2) from information_schema.ins_disk_usage where db_name="{self.db_name}"')
|
||||
data1 = int(tdSql.queryResult[0][0])
|
||||
data2 = int(tdSql.queryResult[0][1])
|
||||
|
||||
self._value_check(size1, data1)
|
||||
self._value_check(size2, data2)
|
||||
|
||||
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
|
||||
self.dnode_path = tdCom.getTaosdPath()
|
||||
self.cfg_path = f'{self.dnode_path}/cfg'
|
||||
self.log_path = f'{self.dnode_path}/log'
|
||||
self.db_name = 'test'
|
||||
self.vgroups = 10
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue