enh compress
This commit is contained in:
parent
9cab625f0b
commit
697ca8914e
|
@ -250,7 +250,7 @@ bool checkColumnEncodeOrSetDefault(uint8_t type, char encode[TSDB_CL_COMPRESS_OP
|
|||
strncpy(encode, getDefaultEncodeStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
return checkColumnEncode(encode);
|
||||
return checkColumnEncode(encode) && validColEncode(type, columnEncodeVal(encode));
|
||||
}
|
||||
bool checkColumnCompress(char compress[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(compress)) return true;
|
||||
|
@ -267,7 +267,8 @@ bool checkColumnCompressOrSetDefault(uint8_t type, char compress[TSDB_CL_COMPRES
|
|||
strncpy(compress, getDefaultCompressStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
return checkColumnCompress(compress);
|
||||
|
||||
return checkColumnCompress(compress) && validColCompress(type, columnCompressVal(compress));
|
||||
}
|
||||
bool checkColumnLevel(char level[TSDB_CL_COMPRESS_OPTION_LEN]) {
|
||||
if (0 == strlen(level)) return true;
|
||||
|
@ -288,7 +289,7 @@ bool checkColumnLevelOrSetDefault(uint8_t type, char level[TSDB_CL_COMPRESS_OPTI
|
|||
strncpy(level, getDefaultLevelStr(type), TSDB_CL_COMPRESS_OPTION_LEN);
|
||||
return true;
|
||||
}
|
||||
return checkColumnLevel(level);
|
||||
return checkColumnLevel(level) && validColCompressLevel(type, columnLevelVal(level));
|
||||
}
|
||||
|
||||
void setColEncode(uint32_t* compress, uint8_t l1) {
|
||||
|
|
|
@ -45,8 +45,8 @@ char tsEncryptAlgorithm[16] = {0};
|
|||
char tsEncryptScope[100] = {0};
|
||||
EEncryptAlgor tsiEncryptAlgorithm = 0;
|
||||
EEncryptScope tsiEncryptScope = 0;
|
||||
//char tsAuthCode[500] = {0};
|
||||
//char tsEncryptKey[17] = {0};
|
||||
// char tsAuthCode[500] = {0};
|
||||
// char tsEncryptKey[17] = {0};
|
||||
char tsEncryptKey[17] = {0};
|
||||
|
||||
// common
|
||||
|
@ -127,7 +127,7 @@ bool tsEnableTelem = true;
|
|||
int32_t tsTelemInterval = 43200;
|
||||
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
|
||||
uint16_t tsTelemPort = 80;
|
||||
char *tsTelemUri = "/report";
|
||||
char * tsTelemUri = "/report";
|
||||
|
||||
#ifdef TD_ENTERPRISE
|
||||
bool tsEnableCrashReport = false;
|
||||
|
@ -240,8 +240,8 @@ int64_t tsTickPerMin[] = {60000L, 60000000L, 60000000000L};
|
|||
int64_t tsTickPerHour[] = {3600000L, 3600000000L, 3600000000000L};
|
||||
|
||||
// lossy compress 7
|
||||
char tsLossyColumns[32] = ""; // "float|double" means all float and double columns can be lossy compressed. set empty
|
||||
// can close lossy compress.
|
||||
char tsLossyColumns[32] = "float|double"; // "float|double" means all float and double columns can be lossy compressed.
|
||||
// set empty can close lossy compress.
|
||||
// below option can take effect when tsLossyColumns not empty
|
||||
float tsFPrecision = 1E-8; // float column precision
|
||||
double tsDPrecision = 1E-16; // double column precision
|
||||
|
@ -563,11 +563,12 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, CFG_SCOPE_BOTH, CFG_DYN_BOTH) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "multiResultFunctionStarReturnTags", tsMultiResultFunctionStarReturnTags, CFG_SCOPE_CLIENT,
|
||||
CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, CFG_SCOPE_BOTH, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "maxTsmaCalcDelay", tsMaxTsmaCalcDelay, 600, 86400, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) !=
|
||||
0)
|
||||
if (cfgAddInt32(pCfg, "maxTsmaCalcDelay", tsMaxTsmaCalcDelay, 600, 86400, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "tsmaDataDeleteMark", tsmaDataDeleteMark, 60 * 60 * 1000, INT64_MAX, CFG_SCOPE_CLIENT,
|
||||
CFG_DYN_CLIENT) != 0)
|
||||
|
@ -618,11 +619,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
|
||||
tsNumOfSupportVnodes = tsNumOfCores * 2;
|
||||
tsNumOfSupportVnodes = TMAX(tsNumOfSupportVnodes, 2);
|
||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "supportVnodes", tsNumOfSupportVnodes, 0, 4096, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "encryptAlgorithm", tsEncryptAlgorithm, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "encryptScope", tsEncryptScope, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
//if (cfgAddString(pCfg, "authCode", tsAuthCode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
// if (cfgAddString(pCfg, "authCode", tsAuthCode, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "statusInterval", tsStatusInterval, 1, 30, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "minSlidingTime", tsMinSlidingTime, 1, 1000000, CFG_SCOPE_CLIENT, CFG_DYN_CLIENT) != 0)
|
||||
|
@ -749,8 +751,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1;
|
||||
|
||||
if (cfgAddInt32(pCfg, "maxTsmaNum", tsMaxTsmaNum, 0, 3, CFG_SCOPE_SERVER, CFG_DYN_SERVER) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
|
||||
|
@ -807,7 +808,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
|
|||
CFG_DYN_ENT_SERVER) != 0)
|
||||
return -1;
|
||||
|
||||
if (cfgAddString(pCfg, "lossyColumns", tsLossyColumns, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "fPrecision", tsFPrecision, 0.0f, 100000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddFloat(pCfg, "dPrecision", tsDPrecision, 0.0f, 1000000.0f, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxRange", tsMaxRange, 0, 65536, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1;
|
||||
|
@ -1181,7 +1181,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
|
||||
tstrncpy(tsEncryptAlgorithm, cfgGetItem(pCfg, "encryptAlgorithm")->str, 16);
|
||||
tstrncpy(tsEncryptScope, cfgGetItem(pCfg, "encryptScope")->str, 100);
|
||||
//tstrncpy(tsAuthCode, cfgGetItem(pCfg, "authCode")->str, 100);
|
||||
// tstrncpy(tsAuthCode, cfgGetItem(pCfg, "authCode")->str, 100);
|
||||
|
||||
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
|
||||
tsNumOfRpcSessions = cfgGetItem(pCfg, "numOfRpcSessions")->i32;
|
||||
|
@ -1268,7 +1268,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
|
|||
|
||||
tsCacheLazyLoadThreshold = cfgGetItem(pCfg, "cacheLazyLoadThreshold")->i32;
|
||||
|
||||
tstrncpy(tsLossyColumns, cfgGetItem(pCfg, "lossyColumns")->str, sizeof(tsLossyColumns));
|
||||
tsFPrecision = cfgGetItem(pCfg, "fPrecision")->fval;
|
||||
tsDPrecision = cfgGetItem(pCfg, "dPrecision")->fval;
|
||||
tsMaxRange = cfgGetItem(pCfg, "maxRange")->i32;
|
||||
|
@ -1453,7 +1452,7 @@ void taosCleanupCfg() {
|
|||
|
||||
typedef struct {
|
||||
const char *optionName;
|
||||
void *optionVar;
|
||||
void * optionVar;
|
||||
} OptionNameAndVar;
|
||||
|
||||
static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize, SConfigItem *pItem, bool isDebugflag) {
|
||||
|
@ -1466,7 +1465,7 @@ static int32_t taosCfgSetOption(OptionNameAndVar *pOptions, int32_t optionSize,
|
|||
switch (pItem->dtype) {
|
||||
case CFG_DTYPE_BOOL: {
|
||||
int32_t flag = pItem->i32;
|
||||
bool *pVar = pOptions[d].optionVar;
|
||||
bool * pVar = pOptions[d].optionVar;
|
||||
uInfo("%s set from %d to %d", optName, *pVar, flag);
|
||||
*pVar = flag;
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
|
@ -1864,7 +1863,7 @@ static void taosSetAllDebugFlag(SConfig *pCfg, int32_t flag) {
|
|||
return;
|
||||
}
|
||||
|
||||
SArray *noNeedToSetVars = NULL;
|
||||
SArray * noNeedToSetVars = NULL;
|
||||
SConfigItem *pItem = cfgGetItem(pCfg, "debugFlag");
|
||||
if (pItem != NULL) {
|
||||
pItem->i32 = flag;
|
||||
|
|
|
@ -18,12 +18,10 @@
|
|||
#include "dmNodes.h"
|
||||
#include "index.h"
|
||||
#include "qworker.h"
|
||||
#include "tstream.h"
|
||||
#ifdef TD_TSZ
|
||||
#include "tcompression.h"
|
||||
#include "tglobal.h"
|
||||
#include "tgrant.h"
|
||||
#endif
|
||||
#include "tstream.h"
|
||||
|
||||
static bool dmRequireNode(SDnode *pDnode, SMgmtWrapper *pWrapper) {
|
||||
SMgmtInputOpt input = dmBuildMgmtInputOpt(pWrapper);
|
||||
|
@ -48,10 +46,8 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
#ifdef TD_TSZ
|
||||
// compress module init
|
||||
tsCompressInit(tsLossyColumns, tsFPrecision, tsDPrecision, tsMaxRange, tsCurRange, (int)tsIfAdtFse, tsCompressor);
|
||||
#endif
|
||||
|
||||
pDnode->wrappers[DNODE].func = dmGetMgmtFunc();
|
||||
pDnode->wrappers[MNODE].func = mmGetMgmtFunc();
|
||||
|
@ -120,10 +116,8 @@ void dmCleanupDnode(SDnode *pDnode) {
|
|||
indexCleanup();
|
||||
taosConvDestroy();
|
||||
|
||||
#ifdef TD_TSZ
|
||||
// compress destroy
|
||||
tsCompressExit();
|
||||
#endif
|
||||
|
||||
dDebug("dnode is closed, ptr:%p", pDnode);
|
||||
}
|
||||
|
|
|
@ -61,9 +61,7 @@
|
|||
#include "zstd.h"
|
||||
#endif
|
||||
|
||||
#ifdef TD_TSZ
|
||||
#include "td_sz.h"
|
||||
#endif
|
||||
|
||||
int32_t tsCompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
|
||||
int32_t tsDecompressPlain2(const char *const input, const int32_t nelements, char *const output, const char type);
|
||||
|
@ -322,7 +320,6 @@ static const int32_t TEST_NUMBER = 1;
|
|||
|
||||
#define safeInt64Add(a, b) (((a >= 0) && (b <= INT64_MAX - a)) || ((a < 0) && (b >= INT64_MIN - a)))
|
||||
|
||||
#ifdef TD_TSZ
|
||||
bool lossyFloat = false;
|
||||
bool lossyDouble = false;
|
||||
|
||||
|
@ -341,8 +338,6 @@ int32_t tsCompressInit(char *lossyColumns, float fPrecision, double dPrecision,
|
|||
// exit call
|
||||
void tsCompressExit() { tdszExit(); }
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Compress Integer (Simple8B).
|
||||
*/
|
||||
|
@ -1075,7 +1070,7 @@ void encodeFloatValue(uint32_t diff, uint8_t flag, char *const output, int32_t *
|
|||
}
|
||||
|
||||
int32_t tsCompressFloatImp(const char *const input, const int32_t nelements, char *const output) {
|
||||
float *istream = (float *)input;
|
||||
float * istream = (float *)input;
|
||||
int32_t byte_limit = nelements * FLOAT_BYTES + 1;
|
||||
int32_t opos = 1;
|
||||
|
||||
|
@ -1214,7 +1209,6 @@ int32_t tsDecompressFloatImp(const char *const input, const int32_t nelements, c
|
|||
return nelements * FLOAT_BYTES;
|
||||
}
|
||||
|
||||
#ifdef TD_TSZ
|
||||
//
|
||||
// ---------- float double lossy -----------
|
||||
//
|
||||
|
@ -1283,7 +1277,6 @@ int32_t tsDecompressDoubleLossyImp(const char *input, int32_t compressedSize, co
|
|||
// decompressed with sz
|
||||
return tdszDecompress(SZ_DOUBLE, input + 1, compressedSize - 1, nelements, output);
|
||||
}
|
||||
#endif
|
||||
|
||||
#ifdef BUILD_NO_CALL
|
||||
/*************************************************************************
|
||||
|
@ -2463,13 +2456,11 @@ int32_t tsDecompressTimestamp(void *pIn, int32_t nIn, int32_t nEle, void *pOut,
|
|||
// Float =====================================================
|
||||
int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
|
||||
int32_t nBuf) {
|
||||
#ifdef TD_TSZ
|
||||
// lossy mode
|
||||
if (lossyFloat) {
|
||||
return tsCompressFloatLossyImp(pIn, nEle, pOut);
|
||||
// lossless mode
|
||||
} else {
|
||||
#endif
|
||||
if (cmprAlg == ONE_STAGE_COMP) {
|
||||
return tsCompressFloatImp(pIn, nEle, pOut);
|
||||
} else if (cmprAlg == TWO_STAGE_COMP) {
|
||||
|
@ -2479,19 +2470,15 @@ int32_t tsCompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_
|
|||
ASSERTS(0, "compress algo invalid");
|
||||
return -1;
|
||||
}
|
||||
#ifdef TD_TSZ
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
|
||||
int32_t nBuf) {
|
||||
#ifdef TD_TSZ
|
||||
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
|
||||
// decompress lossy
|
||||
return tsDecompressFloatLossyImp(pIn, nIn, nEle, pOut);
|
||||
} else {
|
||||
#endif
|
||||
// decompress lossless
|
||||
if (cmprAlg == ONE_STAGE_COMP) {
|
||||
return tsDecompressFloatImp(pIn, nEle, pOut);
|
||||
|
@ -2502,20 +2489,16 @@ int32_t tsDecompressFloat(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int3
|
|||
ASSERTS(0, "compress algo invalid");
|
||||
return -1;
|
||||
}
|
||||
#ifdef TD_TSZ
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Double =====================================================
|
||||
int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
|
||||
int32_t nBuf) {
|
||||
#ifdef TD_TSZ
|
||||
if (lossyDouble) {
|
||||
// lossy mode
|
||||
return tsCompressDoubleLossyImp(pIn, nEle, pOut);
|
||||
} else {
|
||||
#endif
|
||||
// lossless mode
|
||||
if (cmprAlg == ONE_STAGE_COMP) {
|
||||
return tsCompressDoubleImp(pIn, nEle, pOut);
|
||||
|
@ -2526,19 +2509,15 @@ int32_t tsCompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32
|
|||
ASSERTS(0, "compress algo invalid");
|
||||
return -1;
|
||||
}
|
||||
#ifdef TD_TSZ
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int32_t nOut, uint8_t cmprAlg, void *pBuf,
|
||||
int32_t nBuf) {
|
||||
#ifdef TD_TSZ
|
||||
if (HEAD_ALGO(((uint8_t *)pIn)[0]) == ALGO_SZ_LOSSY) {
|
||||
// decompress lossy
|
||||
return tsDecompressDoubleLossyImp(pIn, nIn, nEle, pOut);
|
||||
} else {
|
||||
#endif
|
||||
// decompress lossless
|
||||
if (cmprAlg == ONE_STAGE_COMP) {
|
||||
return tsDecompressDoubleImp(pIn, nEle, pOut);
|
||||
|
@ -2549,9 +2528,7 @@ int32_t tsDecompressDouble(void *pIn, int32_t nIn, int32_t nEle, void *pOut, int
|
|||
ASSERTS(0, "compress algo invalid");
|
||||
return -1;
|
||||
}
|
||||
#ifdef TD_TSZ
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
// Binary =====================================================
|
||||
|
|
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue