Merge pull request #25669 from taosdata/enh/TD-29953

Enh/TD-29953
This commit is contained in:
Hongze Cheng 2024-05-15 17:26:26 +08:00 committed by GitHub
commit f4a9c5e3c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 2359 additions and 415 deletions

View File

@ -250,7 +250,7 @@ bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OP
strncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true;
}
return checkColumnEncode(encode);
return checkColumnEncode(encode) && validColEncode(type, columnEncodeVal(encode));
}
bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
if (0 == strlen(compress)) return true;
@ -267,7 +267,8 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES
strncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true;
}
return checkColumnCompress(compress);
return checkColumnCompress(compress) && validColCompress(type, columnCompressVal(compress));
}
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
if (0 == strlen(level)) return true;
@ -288,7 +289,7 @@ bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTI
strncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
return true;
}
return checkColumnLevel(level);
return checkColumnLevel(level) && validColCompressLevel(type, columnLevelVal(level));
}
void setColEncode(uint32_t* compress, uint8_t l1) {

View File

@ -240,8 +240,8 @@ int64_t tsTickPerMin[] = {60000L, 60000000L, 60000000000L};
int64_t tsTickPerHour[] = {3600000L, 3600000000L, 3600000000000L};
// lossy compress 7
char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty
// can close lossy compress.
char tsLossyColumns[32] = "float|double"; // "float|double" means all float and double columns can be lossy compressed.
// set empty can close lossy compress.
// below option can take effect when tsLossyColumns not empty
float tsFPrecision = 1E-8; // float column precision
double tsDPrecision = 1E-16; // double column precision
@ -815,7 +815,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
if (cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
@ -1276,7 +1275,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "lossyColumns")->str, sizeof(tsLossyColumns));
tsFPrecision = cfgGetItem(pCfg, "fPrecision")->fval;
tsDPrecision = cfgGetItem(pCfg, "dPrecision")->fval;
tsMaxRange = cfgGetItem(pCfg, "maxRange")->i32;

View File

@ -18,12 +18,10 @@
#include "dmNodes.h"
#include "index.h"
#include "qworker.h"
#include "tstream.h"
#ifdef TD_TSZ
#include "tcompression.h"
#include "tglobal.h"
#include "tgrant.h"
#endif
#include "tstream.h"
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
@ -48,10 +46,8 @@ int32_t dmInitDnode(SDnode *pDnode) {
goto _OVER;
}
#ifdef TD_TSZ
// compress module init
tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
#endif
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
@ -120,10 +116,8 @@ void dmCleanupDnode(SDnode *pDnode) {
indexCleanup();
taosConvDestroy();
#ifdef TD_TSZ
// compress destroy
tsCompressExit();
#endif
dDebug("dnode is closed, ptr:%p", pDnode);
}

View File

@ -42,7 +42,6 @@ int32_t buildQueryAfterParse(SQuery** pQuery, SNode* pRootNode, int16_t placehol
return TSDB_CODE_SUCCESS;
}
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery) {
SAstCreateContext cxt;
initAstCreateContext(pParseCxt, &cxt);
@ -635,8 +634,8 @@ static int32_t collectMetaKeyFromShowCompacts(SCollectMetaKeyCxt* pCxt, SShowStm
}
static int32_t collectMetaKeyFromShowCompactDetails(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB, TSDB_INS_TABLE_COMPACT_DETAILS,
pCxt->pMetaCache);
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_INFORMATION_SCHEMA_DB,
TSDB_INS_TABLE_COMPACT_DETAILS, pCxt->pMetaCache);
return code;
}
@ -700,7 +699,6 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC
return code;
}
static int32_t collectMetaKeyFromShowApps(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) {
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, TSDB_PERFORMANCE_SCHEMA_DB, TSDB_PERFS_TABLE_APPS,
pCxt->pMetaCache);
@ -759,24 +757,22 @@ static int32_t collectMetaKeyFromRevoke(SCollectMetaKeyCxt* pCxt, SRevokeStmt* p
return reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->objName, pStmt->tabName, pCxt->pMetaCache);
}
static int32_t collectMetaKeyFromCreateViewStmt(SCollectMetaKeyCxt* pCxt, SCreateViewStmt* pStmt) {
int32_t code =
reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
int32_t code = reserveTableMetaInCache(pCxt->pParseCxt->acctId, pStmt->dbName, pStmt->viewName, pCxt->pMetaCache);
if (TSDB_CODE_SUCCESS == code) {
code = reserveUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, NULL, AUTH_TYPE_WRITE,
pCxt->pMetaCache);
}
if (TSDB_CODE_SUCCESS == code) {
code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_ALTER,
pCxt->pMetaCache);
code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName,
AUTH_TYPE_ALTER, pCxt->pMetaCache);
}
return code;
}
static int32_t collectMetaKeyFromDropViewStmt(SCollectMetaKeyCxt* pCxt, SDropViewStmt* pStmt) {
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName, AUTH_TYPE_ALTER,
pCxt->pMetaCache);
int32_t code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName,
pStmt->viewName, AUTH_TYPE_ALTER, pCxt->pMetaCache);
return code;
}

View File

@ -61,9 +61,7 @@
#include "zstd.h"
#endif
#ifdef TD_TSZ
#include "td_sz.h"
#endif
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
@ -322,7 +320,6 @@ static const int32_t TEST_NUMBER = 1;
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
#ifdef TD_TSZ
bool lossyFloat = false;
bool lossyDouble = false;
@ -341,8 +338,6 @@ int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision,
// exit call
void tsCompressExit() { tdszExit(); }
#endif
/*
* Compress Integer (Simple8B).
*/
@ -1214,7 +1209,6 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c
return nelements * FLOAT_BYTES;
}
#ifdef TD_TSZ
//
// ---------- float double lossy -----------
//
@ -1283,7 +1277,6 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
// decompressed with sz
return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
}
#endif
#ifdef BUILD_NO_CALL
/*************************************************************************
@ -2463,13 +2456,11 @@ int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut,
// Float =====================================================
int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf) {
#ifdef TD_TSZ
// lossy mode
if (lossyFloat) {
return tsCompressFloatLossyImp(pIn, nEle, pOut);
// lossless mode
} else {
#endif
if (cmprAlg == ONE_STAGE_COMP) {
return tsCompressFloatImp(pIn, nEle, pOut);
} else if (cmprAlg == TWO_STAGE_COMP) {
@ -2479,19 +2470,15 @@ int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_
ASSERTS(0, "compress algo invalid");
return -1;
}
#ifdef TD_TSZ
}
#endif
}
int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf) {
#ifdef TD_TSZ
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
} else {
#endif
// decompress lossless
if (cmprAlg == ONE_STAGE_COMP) {
return tsDecompressFloatImp(pIn, nEle, pOut);
@ -2502,20 +2489,16 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
ASSERTS(0, "compress algo invalid");
return -1;
}
#ifdef TD_TSZ
}
#endif
}
// Double =====================================================
int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf) {
#ifdef TD_TSZ
if (lossyDouble) {
// lossy mode
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
} else {
#endif
// lossless mode
if (cmprAlg == ONE_STAGE_COMP) {
return tsCompressDoubleImp(pIn, nEle, pOut);
@ -2526,19 +2509,15 @@ int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
ASSERTS(0, "compress algo invalid");
return -1;
}
#ifdef TD_TSZ
}
#endif
}
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
int32_t nBuf) {
#ifdef TD_TSZ
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
// decompress lossy
return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
} else {
#endif
// decompress lossless
if (cmprAlg == ONE_STAGE_COMP) {
return tsDecompressDoubleImp(pIn, nEle, pOut);
@ -2549,9 +2528,7 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
ASSERTS(0, "compress algo invalid");
return -1;
}
#ifdef TD_TSZ
}
#endif
}
// Binary =====================================================

View File

@ -48,13 +48,19 @@ class TDTestCase(TBase):
"bigint","bigint unsigned","timestamp","bool","float","double","binary(16)","nchar(16)",
"varchar(16)","varbinary(16)"]
def combineValid(self, datatype, encode, compress):
if datatype != "float" and datatype != "double":
if compress == "tsz":
return False
return True
def genAllSqls(self, stbName, max):
# encode
encodes = [
[["tinyint","tinyint unsigned","smallint","smallint unsigned","int","int unsigned","bigint","bigint unsigned"], ["simple8b"]],
[["timestamp","bigint","bigint unsigned"], ["delta-i"]],
[["bool"], ["bit-packing"]],
[["float","double"], ["delta-d"]]
[["tinyint","tinyint unsigned","smallint","smallint unsigned","int","int unsigned","bigint","bigint unsigned"], ["simple8B"]],
[["timestamp","bigint","bigint unsigned"], ["Delta-i"]],
[["bool"], ["Bit-packing"]],
[["float","double"], ["Delta-d"]]
]
c = 0 # column number
@ -65,7 +71,6 @@ class TDTestCase(TBase):
# loop append sqls
for lines in encodes:
print(lines)
for datatype in lines[0]:
for encode in lines[1]:
for compress in self.compresses:
@ -74,6 +79,7 @@ class TDTestCase(TBase):
# first
sql = f"create table {self.db}.st{t} (ts timestamp"
else:
if self.combineValid(datatype, encode, compress):
sql += f", c{c} {datatype} ENCODE '{encode}' COMPRESS '{compress}' LEVEL '{level}'"
c += 1
@ -97,9 +103,11 @@ class TDTestCase(TBase):
# check error create
def errorCreate(self):
sqls = [
f"create table terr(ts timestamp, c0 int ENCODE 'abc') ",
f""
f"create table terr(ts timestamp, c0 int ENCODE 'simple8B' COMPRESS 'tsz' LEVEL 'high') ",
f"create table terr(ts timestamp, bi bigint encode 'bit-packing') tags (area int);"
f"create table terr(ts timestamp, ic int encode 'delta-d') tags (area int);"
]
tdSql.errors(sqls)
for dtype in self.dtypes:
# encode
@ -112,6 +120,11 @@ class TDTestCase(TBase):
sql = f"create table terr(ts timestamp, c0 {dtype} LEVEL 'hig') "
tdSql.error(sql)
# tsz check
if dtype != "float" and dtype != "double":
sql = f"create table terr(ts timestamp, c0 {dtype} COMPRESS 'tsz') "
tdSql.error(sql)
# default value correct
def defaultCorrect(self):
# get default encode compress level

View File

@ -72,7 +72,12 @@ python_error=$(cat ${LOG_DIR}/*.info | grep -w "stack" | wc -l)
#0 0x7f2d64f5a808 in __interceptor_malloc ../../../../src/libsanitizer/asan/asan_malloc_linux.cc:144
#1 0x7f2d63fcf459 in strerror /build/glibc-SzIz7B/glibc-2.31/string/strerror.c:38
runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | wc -l)
# TD-29953
#/home/TDinternal/community/utils/TSZ/sz/src/sz_double.c:388:59: runtime error: 2.64021e+25 is outside the range of representable values of type 'long unsigned int'
#/home/TDinternal/community/utils/TSZ/sz/src/sz_float.c:407:59: runtime error: 5.76041e+19 is outside the range of representable values of type 'long unsigned int'
#/home/TDinternal/community/source/libs/scalar/src/sclfunc.c:808:11: runtime error: -3.40401e+18 is outside the range of representable values of type 'int'
runtime_error=$(cat ${LOG_DIR}/*.asan | grep "runtime error" | grep -v "trees.c:873" | grep -v "sclfunc.c.*outside the range of representable values of type" | grep -v "signed integer overflow" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cc" | grep -v "strerror.c" | grep -v "asan_malloc_linux.cpp" | grep -v "sclvector.c" | grep -v "sclfunc.c:808"| grep -v "sz_double.c:388" | grep -v "sz_float.c:407:59"| wc -l)
echo -e "\033[44;32;1m"asan error_num: $error_num"\033[0m"
echo -e "\033[44;32;1m"asan memory_leak: $memory_leak"\033[0m"

File diff suppressed because it is too large Load Diff