merge develop add sz_size_type==4

This commit is contained in:
tickduan 2021-07-04 12:25:02 +08:00
commit 124cbfde9d
78 changed files with 992 additions and 482 deletions

View File

@ -42,13 +42,6 @@ INCLUDE(cmake/env.inc)
INCLUDE(cmake/version.inc)
INCLUDE(cmake/install.inc)
IF (CMAKE_SYSTEM_NAME MATCHES "Linux")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -pipe -Wall -Wshadow -Werror")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pipe -Wall -Wshadow -Werror")
ENDIF ()
MESSAGE(STATUS "CMAKE_C_FLAGS: ${CMAKE_C_FLAGS}")
MESSAGE(STATUS "CMAKE_CXX_FLAGS: ${CMAKE_CXX_FLAGS}")
ADD_SUBDIRECTORY(deps)
ADD_SUBDIRECTORY(src)
ADD_SUBDIRECTORY(tests)

View File

@ -147,7 +147,12 @@ IF (TD_DARWIN_64)
ADD_DEFINITIONS(-D_REENTRANT -D__USE_POSIX -D_LIBC_REENTRANT)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "darwin64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
IF ("${CPUTYPE}" STREQUAL "apple_m1")
SET(COMMON_FLAGS "-Wall -Werror -Wno-missing-braces -fPIC -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ELSE ()
SET(COMMON_FLAGS "-Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
ENDIF ()
IF (TD_MEMORY_SANITIZER)
SET(DEBUG_FLAGS "-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -O0 -g3 -DDEBUG")
ELSE ()
@ -178,7 +183,7 @@ IF (TD_WINDOWS)
MESSAGE("memory sanitizer detected as false")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
ENDIF ()
SET(RELEASE_FLAGS "/W0 /O3 /GL")
SET(RELEASE_FLAGS "/W0 /O2 /GL") # MSVC only support O2
ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/pthread)
@ -201,6 +206,10 @@ IF (TD_WINDOWS_32)
MESSAGE(STATUS "windows32 is defined")
ENDIF ()
IF (TD_LINUX)
SET(COMMON_FLAGS "${COMMON_FLAGS} -pipe -Wshadow")
ENDIF ()
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/os/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)

View File

@ -108,6 +108,10 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_LINUX TRUE)
SET(TD_LINUX_64 FALSE)
SET(TD_MIPS_64 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "arm64")
SET(CPUTYPE "apple_m1")
MESSAGE(STATUS "Set CPUTYPE to apple silicon m1")
SET(TD_ARM_64 TRUE)
ENDIF ()
ELSE ()
@ -153,5 +157,5 @@ ELSEIF (${OSTYPE} MATCHES "Alpine")
MESSAGE(STATUS "input osType: Alpine")
SET(TD_APLHINE TRUE)
ELSE ()
MESSAGE(STATUS "input osType unknown: " ${OSTYPE})
MESSAGE(STATUS "The user specified osType is unknown: " ${OSTYPE})
ENDIF ()

2
deps/CMakeLists.txt vendored
View File

@ -22,7 +22,7 @@ IF (TD_DARWIN AND TD_MQTT)
ENDIF ()
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
MESSAGE("setup dpes/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
MESSAGE("setup deps/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
MESSAGE("binary dir:" ${CMAKE_BINARY_DIR})
include(ExternalProject)
ExternalProject_Add(jemalloc

View File

@ -27,9 +27,9 @@ typedef struct DoubleValueCompressElement
typedef struct FloatValueCompressElement
{
float data;
int curValue;
unsigned char curBytes[4]; //big_endian
float data; // diffValue + medianValue
int curValue; // diff int value
unsigned char curBytes[4]; // dif bytes value diffValue->iValue big_endian
int reqBytesLength;
int resiBitsLength;
} FloatValueCompressElement;

View File

@ -10,6 +10,7 @@
#ifndef _TightDataPointStorageD_H
#define _TightDataPointStorageD_H
#include <stdbool.h>
#include "pub.h"
#ifdef __cplusplus
@ -88,7 +89,7 @@ void new_TightDataPointStorageD2(TightDataPointStorageD **self,
void convertTDPStoBytes_double(TightDataPointStorageD* tdps, unsigned char* bytes, unsigned char* dsLengthBytes, unsigned char sameByte);
void convertTDPStoBytes_double_reserve(TightDataPointStorageD* tdps, unsigned char* bytes, unsigned char* dsLengthBytes, unsigned char sameByte);
void convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size);
bool convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size);
void convertTDPStoFlatBytes_double_args(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size);
void free_TightDataPointStorageD(TightDataPointStorageD *tdps);

View File

@ -14,7 +14,8 @@
extern "C" {
#endif
#include <stdio.h>
#include <stdio.h>
#include <stdbool.h>
#include "pub.h"
typedef struct TightDataPointStorageF
@ -92,7 +93,7 @@ void new_TightDataPointStorageF2(TightDataPointStorageF **self,
unsigned char* pwrErrBoundBytes, size_t pwrErrBoundBytes_size, unsigned char radExpo);
void convertTDPStoBytes_float(TightDataPointStorageF* tdps, unsigned char* bytes, unsigned char* dsLengthBytes, unsigned char sameByte);
void convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size);
bool convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size);
void convertTDPStoFlatBytes_float_args(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size);
void free_TightDataPointStorageF(TightDataPointStorageF *tdps);

View File

@ -89,6 +89,7 @@
#define SZ_MERR -5 //sz_mode error
#define SZ_BERR -6 //bound-mode error (should be only SZ_ABS, REL, ABS_AND_REL, ABS_OR_REL, or PW_REL)
#define SZ_LITTER_ELEMENT -7
#define SZ_ALGORITHM_ERR -8
#define SZ_MAINTAIN_VAR_DATA 0
#define SZ_DESTROY_WHOLE_VARSET 1

View File

@ -15,6 +15,8 @@ extern "C" {
#endif
#include <stdio.h>
#include <stdbool.h>
unsigned char* SZ_skip_compress_double(double* data, size_t dataLength, size_t* outSize);
void computeReqLength_double(double realPrecision, short radExpo, int* reqLength, double* medianValue);
@ -30,7 +32,7 @@ TightDataPointStorageD* SZ_compress_double_1D_MDQ(double *oriData,
size_t dataLength, double realPrecision, double valueRangeSize, double medianValue_d);
void SZ_compress_args_double_StoreOriData(double* oriData, size_t dataLength, unsigned char* newByteData, size_t *outSize);
char SZ_compress_args_double_NoCkRngeNoGzip_1D( unsigned char* newByteData, double *oriData, size_t dataLength, double realPrecision, size_t *outSize, double valueRangeSize, double medianValue_d);
bool SZ_compress_args_double_NoCkRngeNoGzip_1D( unsigned char* newByteData, double *oriData, size_t dataLength, double realPrecision, size_t *outSize, double valueRangeSize, double medianValue_d);
TightDataPointStorageD* SZ_compress_double_1D_MDQ_MSST19(double *oriData, size_t dataLength, double realPrecision, double valueRangeSize, double medianValue_f);

View File

@ -31,7 +31,7 @@ size_t dataLength, float realPrecision, float valueRangeSize, float medianValue_
void SZ_compress_args_float_StoreOriData(float* oriData, size_t dataLength, unsigned char* newByteData, size_t *outSize);
char SZ_compress_args_float_NoCkRngeNoGzip_1D( unsigned char* newByteData, float *oriData,
bool SZ_compress_args_float_NoCkRngeNoGzip_1D( unsigned char* newByteData, float *oriData,
size_t dataLength, double realPrecision, size_t *outSize, float valueRangeSize, float medianValue_f);
size_t SZ_compress_float_1D_MDQ_RA_block(float * block_ori_data, float * mean, size_t dim_0, size_t block_dim_0, double realPrecision, int * type, float * unpredictable_data);

View File

@ -7,6 +7,8 @@
extern "C" {
#endif
void cost_start();
double cost_end(const char* tag);
//
// compress interface to tdengine return value is count of output with bytes
//

View File

@ -227,25 +227,28 @@ void updateLossyCompElement_Double(unsigned char* curBytes, unsigned char* preBy
lce->residualMidBits = resiBits;
}
inline void updateLossyCompElement_Float(unsigned char* curBytes, unsigned char* preBytes,
inline void updateLossyCompElement_Float(unsigned char* diffBytes, unsigned char* preDiffBytes,
int reqBytesLength, int resiBitsLength, LossyCompressionElement *lce)
{
int resiIndex, intMidBytes_Length = 0;
int leadingNum = compIdenticalLeadingBytesCount_float(preBytes, curBytes); //in fact, float is enough for both single-precision and double-precisiond ata.
int leadingNum = compIdenticalLeadingBytesCount_float(preDiffBytes, diffBytes); //in fact, float is enough for both single-precision and double-precisiond ata.
int fromByteIndex = leadingNum;
int toByteIndex = reqBytesLength; //later on: should use "< toByteIndex" to tarverse....
if(fromByteIndex < toByteIndex)
{
intMidBytes_Length = reqBytesLength - leadingNum;
memcpy(lce->integerMidBytes, &(curBytes[fromByteIndex]), intMidBytes_Length);
// set lce mid data
memcpy(lce->integerMidBytes, &(diffBytes[fromByteIndex]), intMidBytes_Length);
}
int resiBits = 0;
if(resiBitsLength!=0)
{
resiIndex = reqBytesLength;
if(resiIndex < 8)
resiBits = (curBytes[resiIndex] & 0xFF) >> (8-resiBitsLength);
resiBits = (diffBytes[resiIndex] & 0xFF) >> (8-resiBitsLength);
}
// set lce
lce->leadingZeroBytes = leadingNum;
lce->integerMidBytes_Length = intMidBytes_Length;
lce->resMidBitsLength = resiBitsLength;

View File

@ -572,7 +572,7 @@ void convertTDPStoBytes_double_reserve(TightDataPointStorageD* tdps, unsigned ch
}
//Convert TightDataPointStorageD to bytes...
void convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size)
bool convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size)
{
size_t i, k = 0;
unsigned char dsLengthBytes[8];
@ -597,6 +597,10 @@ void convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char*
{
size_t totalByteLength = 3 + 1 + MetaDataByteLength_double + exe_params->SZ_SIZE_TYPE + tdps->exactMidBytes_size;
//bytes = (unsigned char *)malloc(sizeof(unsigned char)*totalByteLength); // comment by tickduan
if(totalByteLength >= tdps->dataSeriesLength * sizeof(double))
{
return false;
}
for (i = 0; i < 3; i++)//3
bytes[k++] = versionNumber[i];
@ -636,15 +640,21 @@ void convertTDPStoFlatBytes_double(TightDataPointStorageD *tdps, unsigned char*
totalByteLength += (1+1); // for MSST19
//*bytes = (unsigned char *)malloc(sizeof(unsigned char)*totalByteLength); comment by tickduan
if(totalByteLength >= tdps->dataSeriesLength * sizeof(double))
{
return false;
}
convertTDPStoBytes_double(tdps, bytes, dsLengthBytes, sameByte);
*size = totalByteLength;
}
else //the case with reserved value
{
//TODO
return false;
}
return true;
}
void convertTDPStoFlatBytes_double_args(TightDataPointStorageD *tdps, unsigned char* bytes, size_t *size)

View File

@ -197,7 +197,7 @@ int new_TightDataPointStorageF_fromFlatBytes(TightDataPointStorageF **this, unsi
byteBuf[i] = flatBytes[index++];
(*this)->reservedValue = bytesToFloat(byteBuf);//4
}
// leadNum size calc from exactDataNum
size_t logicLeadNumBitsNum = (*this)->exactDataNum * 2;
if (logicLeadNumBitsNum % 8 == 0)
{
@ -383,73 +383,76 @@ void convertTDPStoBytes_float(TightDataPointStorageF* tdps, unsigned char* bytes
unsigned char pwrErrBoundBytes_sizeBytes[4];
unsigned char max_quant_intervals_Bytes[4];
// 1 version
for(i = 0;i<3;i++)//3 bytes
bytes[k++] = versionNumber[i];
// 2 same
bytes[k++] = sameByte; //1 byte
// 3 meta
convertSZParamsToBytes(confparams_cpr, &(bytes[k]));
k = k + MetaDataByteLength;
// 4 element count
for(i = 0;i<exe_params->SZ_SIZE_TYPE;i++)//ST: 4 or 8 bytes
bytes[k++] = dsLengthBytes[i];
intToBytes_bigEndian(max_quant_intervals_Bytes, confparams_cpr->max_quant_intervals);
// 5 max_quant_intervals length
for(i = 0;i<4;i++)//4
bytes[k++] = max_quant_intervals_Bytes[i];
if(confparams_cpr->errorBoundMode>=PW_REL)
{
// 6 range exponent
bytes[k++] = tdps->radExpo; //1 byte
// 7 segmetn size
sizeToBytes(segment_sizeBytes, confparams_cpr->segment_size);
for(i = 0;i<exe_params->SZ_SIZE_TYPE;i++)//ST
bytes[k++] = segment_sizeBytes[i];
// 8 pwrErrBoundBytes_size
intToBytes_bigEndian(pwrErrBoundBytes_sizeBytes, tdps->pwrErrBoundBytes_size);
for(i = 0;i<4;i++)//4
bytes[k++] = pwrErrBoundBytes_sizeBytes[i];
}
// 9 intervals
intToBytes_bigEndian(intervalsBytes, tdps->intervals);
for(i = 0;i<4;i++)//4
bytes[k++] = intervalsBytes[i];
// 10 median
floatToBytes(medianValueBytes, tdps->medianValue);
for (i = 0; i < 4; i++)// 4
bytes[k++] = medianValueBytes[i];
// 11 reqLength
bytes[k++] = tdps->reqLength; //1 byte
// 12 plus max
if(confparams_cpr->errorBoundMode == PW_REL && confparams_cpr->accelerate_pw_rel_compression)
{
bytes[k++] = tdps->plus_bits;
bytes[k++] = tdps->max_bits;
}
// 13 realPrecision
doubleToBytes(realPrecisionBytes, tdps->realPrecision);
for (i = 0; i < 8; i++)// 8
bytes[k++] = realPrecisionBytes[i];
// 14 typeArray size
sizeToBytes(typeArrayLengthBytes, tdps->typeArray_size);
for(i = 0;i<exe_params->SZ_SIZE_TYPE;i++)//ST
bytes[k++] = typeArrayLengthBytes[i];
// 15 exactDataNum leadNum calc by this , so not save leadNum
sizeToBytes(exactLengthBytes, tdps->exactDataNum);
for(i = 0;i<exe_params->SZ_SIZE_TYPE;i++)//ST
bytes[k++] = exactLengthBytes[i];
// 16 Mid Length
sizeToBytes(exactMidBytesLength, tdps->exactMidBytes_size);
for(i = 0;i<exe_params->SZ_SIZE_TYPE;i++)//ST
bytes[k++] = exactMidBytesLength[i];
if(confparams_cpr->errorBoundMode>=PW_REL)
{
floatToBytes(exactMidBytesLength, tdps->minLogValue);
for(i=0;i<4;i++)
bytes[k++] = exactMidBytesLength[i];
}
// 17 type data
memcpy(&(bytes[k]), tdps->typeArray, tdps->typeArray_size);
k += tdps->typeArray_size;
if(confparams_cpr->errorBoundMode>=PW_REL)
@ -457,12 +460,13 @@ void convertTDPStoBytes_float(TightDataPointStorageF* tdps, unsigned char* bytes
memcpy(&(bytes[k]), tdps->pwrErrBoundBytes, tdps->pwrErrBoundBytes_size);
k += tdps->pwrErrBoundBytes_size;
}
//18 leadNum data
memcpy(&(bytes[k]), tdps->leadNumArray, tdps->leadNumArray_size);
k += tdps->leadNumArray_size;
// 19 mid data
memcpy(&(bytes[k]), tdps->exactMidBytes, tdps->exactMidBytes_size);
k += tdps->exactMidBytes_size;
// 20 residual
if(tdps->residualMidBits!=NULL)
{
memcpy(&(bytes[k]), tdps->residualMidBits, tdps->residualMidBits_size);
@ -471,7 +475,7 @@ void convertTDPStoBytes_float(TightDataPointStorageF* tdps, unsigned char* bytes
}
//convert TightDataPointStorageD to bytes...
void convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size)
bool convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size)
{
size_t i, k = 0;
unsigned char dsLengthBytes[8];
@ -494,24 +498,32 @@ void convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* b
if(confparams_cpr->protectValueRange)
sameByte = (unsigned char) (sameByte | 0x04); //0000,0100
if(tdps->allSameData==1)
if(tdps->allSameData == 1 )
{
//
// same format
//
size_t totalByteLength = 3 + 1 + MetaDataByteLength + exe_params->SZ_SIZE_TYPE + tdps->exactMidBytes_size;
//*bytes = (unsigned char *)malloc(sizeof(unsigned char)*totalByteLength); // not need malloc comment by tickduan
// check output buffer enough
if(totalByteLength >= tdps->dataSeriesLength * sizeof(float) )
{
*size = 0;
return false;
}
// 1 version 3 bytes
for (i = 0; i < 3; i++)//3
bytes[k++] = versionNumber[i];
// 2 same flag 1 bytes
bytes[k++] = sameByte;
// 3 metaData 26 bytes
convertSZParamsToBytes(confparams_cpr, &(bytes[k]));
k = k + MetaDataByteLength;
// 4 data Length 4 or 8 bytes
for (i = 0; i < exe_params->SZ_SIZE_TYPE; i++)
bytes[k++] = dsLengthBytes[i];
// 5 exactMidBytes exactMidBytes_size bytes
for (i = 0; i < tdps->exactMidBytes_size; i++)
bytes[k++] = tdps->exactMidBytes[i];
@ -519,6 +531,9 @@ void convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* b
}
else if (tdps->rtypeArray == NULL)
{
//
// not same format
//
size_t residualMidBitsLength = tdps->residualMidBits == NULL ? 0 : tdps->residualMidBits_size;
size_t segmentL = 0, radExpoL = 0, pwrBoundArrayL = 0;
int minLogValueSize = 0;
@ -538,15 +553,24 @@ void convertTDPStoFlatBytes_float(TightDataPointStorageF *tdps, unsigned char* b
totalByteLength += (1+1); // for MSST19
//*bytes = (unsigned char *)malloc(sizeof(unsigned char)*totalByteLength); // comment by tickduan
if(totalByteLength >= tdps->dataSeriesLength * sizeof(float))
{
*size = 0;
return false;
}
convertTDPStoBytes_float(tdps, bytes, dsLengthBytes, sameByte);
*size = totalByteLength;
return true;
}
else //the case with reserved value
{
//TODO
*size = 0;
return false;
}
return true;
}
void convertTDPStoFlatBytes_float_args(TightDataPointStorageF *tdps, unsigned char* bytes, size_t *size)

View File

@ -223,7 +223,7 @@ unsigned long zlib_compress5(unsigned char* data, unsigned long dataLength, unsi
size_t p_size = 0, av_in = 0;
uLong estCmpLen = deflateBound(&strm, dataLength);
//*compressBytes = (unsigned char*)malloc(sizeof(unsigned char)*estCmpLen); // comment by tickduan no need malloc
unsigned char* out = *compressBytes;
unsigned char* out = compressBytes;
/* compress until end of file */
do {

View File

@ -94,12 +94,14 @@ int SZ_ReadConf(const char* sz_cfgFile) {
confparams_cpr->plus_bits = 3;
// default option
if(sz_cfgFile == NULL || access(sz_cfgFile, F_OK) != 0)
{
dataEndianType = LITTLE_ENDIAN_DATA;
confparams_cpr->sol_ID = SZ;
confparams_cpr->max_quant_intervals = 65536;
confparams_cpr->max_quant_intervals = 500;
confparams_cpr->maxRangeRadius = confparams_cpr->max_quant_intervals/2;
confparams_cpr->quantization_intervals = 5000;
exe_params->intvCapacity = confparams_cpr->maxRangeRadius*2;
exe_params->intvRadius = confparams_cpr->maxRangeRadius;
@ -118,8 +120,8 @@ int SZ_ReadConf(const char* sz_cfgFile) {
confparams_cpr->errorBoundMode = SZ_ABS;
confparams_cpr->psnr = 90;
confparams_cpr->absErrBound = 1E-10;
confparams_cpr->relBoundRatio = 1E-10;
confparams_cpr->absErrBound = 1E-20;
confparams_cpr->relBoundRatio = 1E-8;
confparams_cpr->accelerate_pw_rel_compression = 1;
confparams_cpr->pw_relBoundRatio = 1E-3;

View File

@ -448,27 +448,26 @@ inline void compressUInt64Value(uint64_t tgtValue, uint64_t minValue, int byteSi
memcpy(bytes, tmpBytes + 8 - byteSize, byteSize);
}
inline void compressSingleFloatValue(FloatValueCompressElement *vce, float tgtValue, float precision, float medianValue,
inline void compressSingleFloatValue(FloatValueCompressElement *vce, float oriValue, float precision, float medianValue,
int reqLength, int reqBytesLength, int resiBitsLength)
{
float normValue = tgtValue - medianValue;
lfloat diffVal;
diffVal.value = oriValue - medianValue;
lfloat lfBuf;
lfBuf.value = normValue;
int ignBytesLength = 32 - reqLength;
if(ignBytesLength<0)
ignBytesLength = 0;
// calc ignore bit count
int ignBitCount = 32 - reqLength;
if(ignBitCount<0)
ignBitCount = 0;
int tmp_int = lfBuf.ivalue;
intToBytes_bigEndian(vce->curBytes, tmp_int);
lfBuf.ivalue = (lfBuf.ivalue >> ignBytesLength) << ignBytesLength;
int tmp_int = diffVal.ivalue;
intToBytes_bigEndian(vce->curBytes, diffVal.ivalue);
//float tmpValue = lfBuf.value;
// truncate diff value tail bit with ignBitCount
diffVal.ivalue = (diffVal.ivalue >> ignBitCount) << ignBitCount;
vce->data = lfBuf.value+medianValue;
vce->curValue = tmp_int;
// save to vce
vce->data = diffVal.value + medianValue;
vce->curValue = diffVal.ivalue;
vce->reqBytesLength = reqBytesLength;
vce->resiBitsLength = resiBitsLength;
}

43
deps/SZ/sz/src/sz.c vendored
View File

@ -26,7 +26,7 @@
//#include "CurveFillingCompressStorage.h"
int versionNumber[4] = {SZ_VER_MAJOR,SZ_VER_MINOR,SZ_VER_BUILD,SZ_VER_REVISION};
//int SZ_SIZE_TYPE = 8;
int SZ_SIZE_TYPE_DEFUALT = 4;
int dataEndianType = LITTLE_ENDIAN_DATA; //*endian type of the data read from disk
int sysEndianType = LITTLE_ENDIAN_SYSTEM ; //*sysEndianType is actually set automatically.
@ -67,8 +67,7 @@ int SZ_Init(const char *configFilePath)
if(loadFileResult==SZ_FAILED)
return SZ_FAILED;
exe_params->SZ_SIZE_TYPE = sizeof(size_t);
exe_params->SZ_SIZE_TYPE = SZ_SIZE_TYPE_DEFUALT;
if(confparams_cpr->szMode == SZ_TEMPORAL_COMPRESSION)
{
initSZ_TSC();
@ -115,13 +114,14 @@ int SZ_Init_Params(sz_params *params)
size_t SZ_compress_args(int dataType, void *data, size_t r1, unsigned char* outData, sz_params* params)
{
size_t outSize = 0;
int status;
if(dataType==SZ_FLOAT)
{
SZ_compress_args_float((float *)data, r1, outData, &outSize, params);
status = SZ_compress_args_float((float *)data, r1, outData, &outSize, params);
}
else if(dataType==SZ_DOUBLE)
{
SZ_compress_args_double((double *)data, r1, outData, &outSize, params);
status = SZ_compress_args_double((double *)data, r1, outData, &outSize, params);
}
else
{
@ -139,7 +139,6 @@ size_t SZ_decompress(int dataType, unsigned char *bytes, size_t byteLength, size
{
sz_exedata de_exe;
memset(&de_exe, 0, sizeof(sz_exedata));
de_exe.SZ_SIZE_TYPE = 8;
sz_params de_params;
memset(&de_params, 0, sizeof(sz_params));
@ -192,3 +191,35 @@ void SZ_Finalize()
}
}
struct timeval startTime;
struct timeval endTime; /* Start and end times */
struct timeval costStart; /*only used for recording the cost*/
double totalCost = 0;
void cost_start()
{
totalCost = 0;
gettimeofday(&costStart, NULL);
}
double cost_end(const char* tag)
{
double elapsed;
struct timeval costEnd;
gettimeofday(&costEnd, NULL);
elapsed = ((costEnd.tv_sec*1000000+costEnd.tv_usec)-(costStart.tv_sec*1000000+costStart.tv_usec))/1000000.0;
totalCost += elapsed;
double use_ms = totalCost*1000;
printf(" timecost %s : %.3f ms\n", tag, use_ms);
return use_ms;
}
void show_rate(int in_len, int out_len)
{
float rate=100*(float)out_len/(float)in_len;
printf(" in_len=%d out_len=%d compress rate=%.4f%%\n", in_len, out_len, rate);
}

View File

@ -264,20 +264,24 @@ void SZ_compress_args_double_StoreOriData(double* oriData, size_t dataLength, un
}
char SZ_compress_args_double_NoCkRngeNoGzip_1D(unsigned char* newByteData, double *oriData,
bool SZ_compress_args_double_NoCkRngeNoGzip_1D(unsigned char* newByteData, double *oriData,
size_t dataLength, double realPrecision, size_t *outSize, double valueRangeSize, double medianValue_d)
{
char compressionType = 0;
TightDataPointStorageD* tdps = NULL;
tdps = SZ_compress_double_1D_MDQ(oriData, dataLength, realPrecision, valueRangeSize, medianValue_d);
convertTDPStoFlatBytes_double(tdps, newByteData, outSize);
if(!convertTDPStoFlatBytes_double(tdps, newByteData, outSize))
{
free_TightDataPointStorageD(tdps);
return false;
}
if(*outSize>3 + MetaDataByteLength_double + exe_params->SZ_SIZE_TYPE + 1 + sizeof(double)*dataLength)
SZ_compress_args_double_StoreOriData(oriData, dataLength, newByteData, outSize);
//if(*outSize>3 + MetaDataByteLength_double + exe_params->SZ_SIZE_TYPE + 1 + sizeof(double)*dataLength)
// SZ_compress_args_double_StoreOriData(oriData, dataLength, newByteData, outSize);
free_TightDataPointStorageD(tdps);
return compressionType;
return true;
}
/*MSST19*/
@ -482,7 +486,6 @@ int SZ_compress_args_double(double *oriData, size_t r1, unsigned char* newByteDa
printf("error, double input elements count=%d less than %d, so need not do compress.\n", dataLength, MIN_NUM_OF_ELEMENTS);
return SZ_LITTER_ELEMENT;
}
if(params->errorBoundMode == PW_REL && params->accelerate_pw_rel_compression == 1)
{
@ -524,7 +527,8 @@ int SZ_compress_args_double(double *oriData, size_t r1, unsigned char* newByteDa
{
size_t tmpOutSize = 0;
unsigned char* tmpByteData = newByteData;
if(params->szMode != SZ_BEST_SPEED)
bool twoStage = params->szMode != SZ_BEST_SPEED;
if(twoStage)
{
tmpByteData = (unsigned char*)malloc(r1*sizeof(double)*1.2);
}
@ -538,28 +542,28 @@ int SZ_compress_args_double(double *oriData, size_t r1, unsigned char* newByteDa
}
else
{
SZ_compress_args_double_NoCkRngeNoGzip_1D(tmpByteData, oriData, r1, realPrecision, &tmpOutSize, valueRangeSize, medianValue);
if(tmpOutSize>=dataLength*sizeof(double) + 3 + MetaDataByteLength_double + exe_params->SZ_SIZE_TYPE + 1)
SZ_compress_args_double_StoreOriData(oriData, dataLength, tmpByteData, &tmpOutSize);
if(!SZ_compress_args_double_NoCkRngeNoGzip_1D(tmpByteData, oriData, r1, realPrecision, &tmpOutSize, valueRangeSize, medianValue))
{
if(twoStage)
free(tmpByteData);
return SZ_ALGORITHM_ERR;
}
//if(tmpOutSize>=dataLength*sizeof(double) + 3 + MetaDataByteLength_double + exe_params->SZ_SIZE_TYPE + 1)
// SZ_compress_args_double_StoreOriData(oriData, dataLength, tmpByteData, &tmpOutSize);
}
//
//Call Gzip to do the further compression.
//
if(params->szMode==SZ_BEST_SPEED)
{
*outSize = tmpOutSize;
//*newByteData = tmpByteData;
}
else if(params->szMode==SZ_BEST_COMPRESSION || params->szMode==SZ_DEFAULT_COMPRESSION || params->szMode==SZ_TEMPORAL_COMPRESSION)
if(twoStage)
{
*outSize = sz_lossless_compress(params->losslessCompressor, params->gzipMode, tmpByteData, tmpOutSize, newByteData);
free(tmpByteData);
}
else
{
printf("Error: Wrong setting of params->szMode in the double compression.\n");
status = SZ_MERR;
*outSize = tmpOutSize;
}
}

View File

@ -40,10 +40,10 @@ unsigned char* SZ_skip_compress_float(float* data, size_t dataLength, size_t* ou
return out;
}
void computeReqLength_float(double realPrecision, short radExpo, int* reqLength, float* medianValue)
void computeReqLength_float(double realPrecision, short rangeExpo, int* reqLength, float* medianValue)
{
short reqExpo = getPrecisionReqLength_double(realPrecision);
*reqLength = 9+radExpo - reqExpo+1; //radExpo-reqExpo == reqMantiLength
short realPrecExpo = getPrecisionReqLength_double(realPrecision);
*reqLength = 9 + rangeExpo - realPrecExpo + 1; //radExpo-reqExpo == reqMantiLength
if(*reqLength<9)
*reqLength = 9;
if(*reqLength>32)
@ -111,31 +111,39 @@ size_t dataLength, float realPrecision, float valueRangeSize, float medianValue_
else
quantization_intervals = exe_params->intvCapacity;
//updateQuantizationInfo(quantization_intervals);
int intvRadius = quantization_intervals/2;
int half_interval = quantization_intervals/2;
//
// calc reqlength and need set medianValue to zero.
//
size_t i;
int reqLength;
int reqLength; // need save bits length for one float . value ragne 9~32
float medianValue = medianValue_f;
short radExpo = getExponent_float(valueRangeSize/2);
short rangeExpo = getExponent_float(valueRangeSize/2);
computeReqLength_float(realPrecision, radExpo, &reqLength, &medianValue);
computeReqLength_float(realPrecision, rangeExpo, &reqLength, &medianValue);
int* type = (int*) malloc(dataLength*sizeof(int));
float* spaceFillingValue = oriData; //
//
// malloc all
//
// 1 type
int* type = (int*) malloc(dataLength*sizeof(int));
float* spaceFillingValue = oriData; //
// 2 lead
DynamicIntArray *exactLeadNumArray;
new_DIA(&exactLeadNumArray, DynArrayInitLen);
// 3 mid
DynamicByteArray *exactMidByteArray;
new_DBA(&exactMidByteArray, DynArrayInitLen);
// 4 residual bit
DynamicIntArray *resiBitArray;
new_DIA(&resiBitArray, DynArrayInitLen);
unsigned char preDataBytes[4];
intToBytes_bigEndian(preDataBytes, 0);
unsigned char preDiffBytes[4];
intToBytes_bigEndian(preDiffBytes, 0);
// calc save byte length and bit lengths with reqLength
int reqBytesLength = reqLength/8;
int resiBitsLength = reqLength%8;
float last3CmprsData[3] = {0};
@ -146,56 +154,57 @@ size_t dataLength, float realPrecision, float valueRangeSize, float medianValue_
//add the first data
type[0] = 0;
compressSingleFloatValue(vce, spaceFillingValue[0], realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDataBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDataBytes,vce->curBytes,4);
// set lce
updateLossyCompElement_Float(vce->curBytes, preDiffBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDiffBytes, vce->curBytes, 4);
// lce to arrays
addExactData(exactMidByteArray, exactLeadNumArray, resiBitArray, lce);
listAdd_float(last3CmprsData, vce->data);
//add the second data
type[1] = 0;
compressSingleFloatValue(vce, spaceFillingValue[1], realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDataBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDataBytes,vce->curBytes,4);
updateLossyCompElement_Float(vce->curBytes, preDiffBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDiffBytes, vce->curBytes, 4);
addExactData(exactMidByteArray, exactLeadNumArray, resiBitArray, lce);
listAdd_float(last3CmprsData, vce->data);
int state;
float checkRadius;
float curData;
float oriFloat;
float pred = last3CmprsData[0];
float predAbsErr;
float diff;
checkRadius = (quantization_intervals-1)*realPrecision;
float interval = 2*realPrecision;
float double_realpreci = 2*realPrecision;
float recip_precision = 1/realPrecision;
for(i=2;i<dataLength;i++)
for(i=2; i < dataLength; i++)
{
curData = spaceFillingValue[i];
oriFloat = spaceFillingValue[i];
//pred = 2*last3CmprsData[0] - last3CmprsData[1];
//pred = last3CmprsData[0];
predAbsErr = fabsf(curData - pred);
if(predAbsErr<checkRadius)
diff = fabsf(oriFloat - pred);
if(diff < checkRadius)
{
state = ((int)(predAbsErr*recip_precision+1))>>1;
if(curData>=pred)
state = ((int)( diff * recip_precision + 1))>>1;
if(oriFloat >= pred)
{
type[i] = intvRadius+state;
pred = pred + state*interval;
type[i] = half_interval + state;
pred = pred + state * double_realpreci;
}
else //curData<pred
{
type[i] = intvRadius-state;
pred = pred - state*interval;
type[i] = half_interval - state;
pred = pred - state * double_realpreci;
}
//double-check the prediction error in case of machine-epsilon impact
if(fabs(curData-pred)>realPrecision)
if(fabs(oriFloat - pred) > realPrecision)
{
type[i] = 0;
compressSingleFloatValue(vce, curData, realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDataBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDataBytes,vce->curBytes,4);
compressSingleFloatValue(vce, oriFloat, realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDiffBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDiffBytes, vce->curBytes, 4);
addExactData(exactMidByteArray, exactLeadNumArray, resiBitArray, lce);
//listAdd_float(last3CmprsData, vce->data);
@ -205,14 +214,15 @@ size_t dataLength, float realPrecision, float valueRangeSize, float medianValue_
{
//listAdd_float(last3CmprsData, pred);
}
// go next
continue;
}
//unpredictable data processing
type[i] = 0;
compressSingleFloatValue(vce, curData, realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDataBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDataBytes,vce->curBytes,4);
compressSingleFloatValue(vce, oriFloat, realPrecision, medianValue, reqLength, reqBytesLength, resiBitsLength);
updateLossyCompElement_Float(vce->curBytes, preDiffBytes, reqBytesLength, resiBitsLength, lce);
memcpy(preDiffBytes, vce->curBytes, 4);
addExactData(exactMidByteArray, exactLeadNumArray, resiBitArray, lce);
//listAdd_float(last3CmprsData, vce->data);
@ -224,8 +234,7 @@ size_t dataLength, float realPrecision, float valueRangeSize, float medianValue_
// int expSegmentsInBytes_size = convertESCToBytes(esc, &expSegmentsInBytes);
size_t exactDataNum = exactLeadNumArray->size;
TightDataPointStorageF* tdps;
TightDataPointStorageF* tdps = NULL;
new_TightDataPointStorageF(&tdps, dataLength, exactDataNum,
type, exactMidByteArray->array, exactMidByteArray->size,
exactLeadNumArray->array,
@ -285,20 +294,32 @@ void SZ_compress_args_float_StoreOriData(float* oriData, size_t dataLength, unsi
*outSize = totalByteLength;
}
char SZ_compress_args_float_NoCkRngeNoGzip_1D( unsigned char* newByteData, float *oriData,
size_t dataLength, double realPrecision, size_t *outSize, float valueRangeSize, float medianValue_f)
// compress core algorithm if success return true else return false
bool SZ_compress_args_float_NoCkRngeNoGzip_1D( unsigned char* newByteData, float *oriData,
size_t dataLength, double realPrecision, size_t *outSize, float valueRangeSize, float medianValue_f)
{
char compressionType = 0;
// get tdps
TightDataPointStorageF* tdps = NULL;
tdps = SZ_compress_float_1D_MDQ(oriData, dataLength, realPrecision, valueRangeSize, medianValue_f);
convertTDPStoFlatBytes_float(tdps, newByteData, outSize);
if(tdps == NULL)
return false;
// serialize
if(!convertTDPStoFlatBytes_float(tdps, newByteData, outSize))
{
free_TightDataPointStorageF(tdps);
return false;
}
// check compressed size large than original
if(*outSize > 3 + MetaDataByteLength + exe_params->SZ_SIZE_TYPE + 1 + sizeof(float)*dataLength)
SZ_compress_args_float_StoreOriData(oriData, dataLength, newByteData, outSize);
{
//SZ_compress_args_float_StoreOriData(oriData, dataLength, newByteData, outSize);
return false;
}
free_TightDataPointStorageF(tdps);
return compressionType;
return true;
}
/*MSST19*/
@ -486,11 +507,16 @@ void SZ_compress_args_float_withinRange(unsigned char* newByteData, float *oriDa
free_TightDataPointStorageF(tdps);
}
void cost_start();
double cost_end(const char* tag);
void show_rate( int in_len, int out_len);
int SZ_compress_args_float(float *oriData, size_t r1, unsigned char* newByteData, size_t *outSize, sz_params* params)
{
int status = SZ_SUCCESS;
size_t dataLength = r1;
//cost_start();
// check at least elements count
if(dataLength <= MIN_NUM_OF_ELEMENTS)
{
@ -502,7 +528,6 @@ int SZ_compress_args_float(float *oriData, size_t r1, unsigned char* newByteData
params->accelerate_pw_rel_compression = 0;
float valueRangeSize = 0, medianValue = 0;
unsigned char * signs = NULL;
bool positive = true;
float nearZero = 0.0;
@ -525,8 +550,8 @@ int SZ_compress_args_float(float *oriData, size_t r1, unsigned char* newByteData
params->fmin = min;
params->fmax = max;
// calc precision
double realPrecision = 0;
if(params->errorBoundMode==PSNR)
{
params->errorBoundMode = SZ_ABS;
@ -545,51 +570,69 @@ int SZ_compress_args_float(float *oriData, size_t r1, unsigned char* newByteData
params->absErrBound = realPrecision;
}
// range
//cost_end(" sz_pre_calc");
//
// do compress
//
if(valueRangeSize <= realPrecision)
{
// special deal with same data
SZ_compress_args_float_withinRange(newByteData, oriData, dataLength, outSize);
return SZ_SUCCESS;
}
//
// first compress with sz
//
size_t tmpOutSize = 0;
unsigned char* tmpByteData = newByteData;
bool twoStage = params->szMode != SZ_BEST_SPEED;
if(twoStage)
{
tmpByteData = (unsigned char*)malloc(r1*sizeof(float) + 1024);
}
if(params->errorBoundMode>=PW_REL)
{
if(params->accelerate_pw_rel_compression && params->maxRangeRadius <= 32768)
SZ_compress_args_float_NoCkRngeNoGzip_1D_pwr_pre_log_MSST19(tmpByteData, oriData, params->pw_relBoundRatio, r1, &tmpOutSize, valueRangeSize, medianValue, signs, &positive, min, max, nearZero);
else
SZ_compress_args_float_NoCkRngeNoGzip_1D_pwr_pre_log(tmpByteData, oriData, params->pw_relBoundRatio, r1, &tmpOutSize, min, max);
}
else
{
// compress core algorithm
if(!SZ_compress_args_float_NoCkRngeNoGzip_1D(tmpByteData, oriData, r1, realPrecision, &tmpOutSize, valueRangeSize, medianValue))
{
*outSize = 0;
if(twoStage)
free(tmpByteData);
return SZ_ALGORITHM_ERR;
}
//if(tmpOutSize >= dataLength*sizeof(float) + 3 + MetaDataByteLength + exe_params->SZ_SIZE_TYPE + 1)
// SZ_compress_args_float_StoreOriData(oriData, dataLength, tmpByteData, &tmpOutSize);
}
//cost_end(" sz_first_compress");
//show_rate(r1*sizeof(float), *outSize);
//
// second compress with Call Zstd or Gzip
//
//cost_start();
if(!twoStage)
{
*outSize = tmpOutSize;
}
else
{
size_t tmpOutSize = 0;
unsigned char* tmpByteData = newByteData;
bool twoStage = params->szMode != SZ_BEST_SPEED;
if(twoStage)
{
tmpByteData = (unsigned char*)malloc(r1*sizeof(float) + 1024);
}
if(params->errorBoundMode>=PW_REL)
{
if(params->accelerate_pw_rel_compression && params->maxRangeRadius <= 32768)
SZ_compress_args_float_NoCkRngeNoGzip_1D_pwr_pre_log_MSST19(tmpByteData, oriData, params->pw_relBoundRatio, r1, &tmpOutSize, valueRangeSize, medianValue, signs, &positive, min, max, nearZero);
else
SZ_compress_args_float_NoCkRngeNoGzip_1D_pwr_pre_log(tmpByteData, oriData, params->pw_relBoundRatio, r1, &tmpOutSize, min, max);
}
else
{
SZ_compress_args_float_NoCkRngeNoGzip_1D( tmpByteData, oriData, r1, realPrecision, &tmpOutSize, valueRangeSize, medianValue);
if(tmpOutSize >= dataLength*sizeof(float) + 3 + MetaDataByteLength + exe_params->SZ_SIZE_TYPE + 1)
SZ_compress_args_float_StoreOriData(oriData, dataLength, tmpByteData, &tmpOutSize);
}
//
//Call Zstd or Gzip to do the further compression.
//
if(!twoStage)
{
*outSize = tmpOutSize;
//*newByteData = tmpByteData;
}
else
{
*outSize = sz_lossless_compress(params->losslessCompressor, params->gzipMode, tmpByteData, tmpOutSize, newByteData);
free(tmpByteData);
}
*outSize = sz_lossless_compress(params->losslessCompressor, params->gzipMode, tmpByteData, tmpOutSize, newByteData);
free(tmpByteData);
}
return status;
//cost_end(" sz_second_compress");
//show_rate(r1*sizeof(float), *outSize);
return SZ_SUCCESS;
}

View File

@ -106,6 +106,7 @@ TDengine是一个高效的存储、查询、分析时序大数据的平台
* [数据导入](/administrator#import):可按脚本文件导入,也可按数据文件导入
* [数据导出](/administrator#export)从shell按表导出也可用taosdump工具做各种导出
* [系统监控](/administrator#status):检查系统现有的连接、查询、流式计算,日志和事件等
* [性能优化](/administrator#optimize):对长期运行的系统进行维护优化,保障性能表现
* [文件目录结构](/administrator#directories)TDengine数据文件、配置文件等所在目录
* [参数限制与保留关键字](/administrator#keywords)TDengine的参数限制与保留关键字列表

View File

@ -533,7 +533,7 @@ Query OK, 1 row(s) in set (0.000141s)
| taos-jdbcdriver 版本 | TDengine 版本 | JDK 版本 |
| -------------------- | ----------------- | -------- |
| 2.0.31 | 2.1.3.0 及以上 | 1.8.x |
| 2.0.22 - 20.0.30 | 2.0.18.0 - 2.1.2.x | 1.8.x |
| 2.0.22 - 2.0.30 | 2.0.18.0 - 2.1.2.x | 1.8.x |
| 2.0.12 - 2.0.21 | 2.0.8.0 - 2.0.17.x | 1.8.x |
| 2.0.4 - 2.0.11 | 2.0.0.0 - 2.0.7.x | 1.8.x |
| 1.0.3 | 1.6.1.x 及以上 | 1.8.x |

View File

@ -259,7 +259,7 @@ typedef struct taosField {
获取最近一次API调用失败的原因,返回值为字符串。
- `char *taos_errno(TAOS_RES *res)`
- `int taos_errno(TAOS_RES *res)`
获取最近一次API调用失败的原因返回值为错误代码。
@ -557,7 +557,7 @@ c1.close()
conn.close()
```
#### 关于纳秒 (nanosecon) 在 Python 连接器中的说明
#### 关于纳秒 (nanosecond) 在 Python 连接器中的说明
由于目前 Python 对 nanosecond 支持的不完善(参见链接 1. 2. ),目前的实现方式是在 nanosecond 精度时返回整数,而不是 ms 和 us 返回的 datetime 类型,应用开发者需要自行处理,建议使用 pandas 的 to_datetime()。未来如果 Python 正式完整支持了纳秒,涛思数据可能会修改相关接口。
@ -907,6 +907,10 @@ go env -w GOPROXY=https://goproxy.io,direct
sql.Open内置的方法Close closes the statement.
### 其他代码示例
[Consume Messages from Kafka](https://github.com/taosdata/go-demo-kafka) 是一个通过 Go 语言实现消费 Kafka 队列写入 TDengine 的示例程序,也可以作为通过 Go 连接 TDengine 的写法参考。
## <a class="anchor" id="nodejs"></a>Node.js Connector
Node.js连接器支持的系统有

View File

@ -418,6 +418,19 @@ TDengine启动后会自动创建一个监测数据库log并自动将服务
这些监测信息的采集缺省是打开的但可以修改配置文件里的选项enableMonitor将其关闭或打开。
<a class="anchor" id="optimize"></a>
## 性能优化
因数据行 [update](https://www.taosdata.com/cn/documentation/faq#update)、表删除、数据过期等原因TDengine 的磁盘存储文件有可能出现数据碎片,影响查询操作的性能表现。从 2.1.3.0 版本开始,新增 SQL 指令 COMPACT 来启动碎片重整过程:
```mysql
COMPACT VNODES IN (vg_id1, vg_id2, ...)
```
COMPACT 命令对指定的一个或多个 VGroup 启动碎片重整系统会通过任务队列尽快安排重整操作的具体执行。COMPACT 指令所需的 VGroup id可以通过 `SHOW VGROUPS;` 指令的输出结果获取;而且在 `SHOW VGROUPS;` 中会有一个 compacting 列,值为 1 时表示对应的 VGroup 正在进行碎片重整,为 0 时则表示并没有处于重整状态。
需要注意的是,碎片重整操作会大幅消耗磁盘 I/O。因此在重整进行期间有可能会影响节点的写入和查询性能甚至在极端情况下导致短时间的阻写。
## <a class="anchor" id="directories"></a>文件目录结构
安装TDengine后默认会在操作系统中生成下列目录或文件

View File

@ -279,11 +279,11 @@ TDengine 缺省的时间戳是毫秒精度,但通过在 CREATE DATABASE 时传
说明:
1) TAGS 列的数据类型不能是 timestamp 类型;
1) TAGS 列的数据类型不能是 timestamp 类型;(从 2.1.3.0 版本开始TAGS 列中支持使用 timestamp 类型,但需注意在 TAGS 中的 timestamp 列写入数据时需要提供给定值,而暂不支持四则运算,例如 `NOW + 10s` 这类表达式)
2) TAGS 列名不能与其他列名相同;
3) TAGS 列名不能为预留关键字;
3) TAGS 列名不能为预留关键字(参见:[参数限制与保留关键字](https://www.taosdata.com/cn/documentation/administrator#keywords) 章节)
4) TAGS 最多允许 128 个,至少 1 个,总长度不超过 16 KB。

View File

@ -87,6 +87,7 @@
TDengine还没有一组专用的validation queries。然而建议你使用系统监测的数据库”log"来做。
<a class="anchor" id="update"></a>
## 9. 我可以删除或更新一条记录吗?
TDengine 目前尚不支持删除功能,未来根据用户需求可能会支持。

View File

@ -515,6 +515,13 @@ c1.close()
conn.close()
```
#### Using nanosecond in Python connector
So far Python still does not completely support nanosecond type. Please refer to the link 1 and 2. The implementation of the python connector is to return an integer number for nanosecond value rather than datatime type as what ms and us do. The developer needs to handle it themselves. We recommend using pandas to_datetime() function. If Python officially support nanosecond in the future, TAOS Data might be possible to change the interface accordingly, which mean the application need change too.
1. https://stackoverflow.com/questions/10611328/parsing-datetime-strings-containing-nanoseconds
2. https://www.python.org/dev/peps/pep-0564/
#### Helper
Users can directly view the usage information of the module through Python's helper, or refer to the sample program in tests/examples/Python. The following are some common classes and methods:

View File

@ -637,6 +637,19 @@ int32_t bnDropDnode(SDnodeObj *pDnode) {
return TSDB_CODE_SUCCESS;
}
int32_t bnDnodeCanCreateMnode(struct SDnodeObj *pDnode) {
if (pDnode == NULL)
return 0;
if (pDnode->isMgmt || pDnode->alternativeRole == TAOS_DN_ALTERNATIVE_ROLE_VNODE
|| pDnode->status == TAOS_DN_STATUS_DROPPING
|| pDnode->status == TAOS_DN_STATUS_OFFLINE) {
return 0;
} else {
return 1;
}
}
static void bnMonitorDnodeModule() {
int32_t numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes >= tsNumOfMnodes) return;
@ -645,13 +658,7 @@ static void bnMonitorDnodeModule() {
SDnodeObj *pDnode = tsBnDnodes.list[i];
if (pDnode == NULL) break;
if (pDnode->isMgmt || pDnode->status == TAOS_DN_STATUS_DROPPING || pDnode->status == TAOS_DN_STATUS_OFFLINE) {
continue;
}
if (pDnode->alternativeRole == TAOS_DN_ALTERNATIVE_ROLE_VNODE) {
continue;
}
if (!bnDnodeCanCreateMnode(pDnode)) continue;
mLInfo("dnode:%d, numOfMnodes:%d expect:%d, create mnode in this dnode", pDnode->dnodeId, numOfMnodes, tsNumOfMnodes);
mnodeCreateMnode(pDnode->dnodeId, pDnode->dnodeEp, true);

View File

@ -217,7 +217,7 @@ void tscColumnListDestroy(SArray* pColList);
void tscColumnListCopy(SArray* dst, const SArray* src, uint64_t tableUid);
void tscColumnListCopyAll(SArray* dst, const SArray* src);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId);
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar);
void tscDequoteAndTrimToken(SStrToken* pToken);
int32_t tscValidateName(SStrToken* pToken);

View File

@ -319,7 +319,7 @@ void tscRestoreFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock);
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar);
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);

View File

@ -745,19 +745,23 @@ static int doBindParam(STableDataBlocks* pBlock, char* data, SParamInfo* param,
switch(param->type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT:
size = 1;
break;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
size = 2;
break;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
case TSDB_DATA_TYPE_FLOAT:
size = 4;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT:
case TSDB_DATA_TYPE_DOUBLE:
case TSDB_DATA_TYPE_TIMESTAMP:
size = 8;

View File

@ -73,7 +73,6 @@ static bool validateTableColumnInfo(SArray* pFieldList, SSqlCmd* pCmd);
static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pCmd);
static int32_t setObjFullName(char* fullName, const char* account, SStrToken* pDB, SStrToken* tableName, int32_t* len);
static void getColumnName(tSqlExprItem* pItem, char* resultFieldName, char* rawName, int32_t nameLength);
static int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t colIndex, tSqlExprItem* pItem, bool finalResult);
@ -916,8 +915,8 @@ static int32_t checkInvalidExprForTimeWindow(SSqlCmd* pCmd, SQueryInfo* pQueryIn
}
int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
const char* msg1 = "sliding cannot be used without interval";
const char* msg2 = "interval cannot be less than 10 ms";
const char* msg3 = "sliding cannot be used without interval";
SSqlCmd* pCmd = &pSql->cmd;
@ -926,7 +925,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
if (!TPARSER_HAS_TOKEN(pSqlNode->interval.interval)) {
if (TPARSER_HAS_TOKEN(pSqlNode->sliding)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
return TSDB_CODE_SUCCESS;
@ -947,7 +946,7 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
if (pQueryInfo->interval.intervalUnit != 'n' && pQueryInfo->interval.intervalUnit != 'y') {
// interval cannot be less than 10 milliseconds
if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MILLI) < tsMinIntervalTime) {
if (convertTimePrecision(pQueryInfo->interval.interval, tinfo.precision, TSDB_TIME_PRECISION_MICRO) < tsMinIntervalTime) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
}
@ -1311,15 +1310,8 @@ static bool validateTagParams(SArray* pTagsList, SArray* pFieldList, SSqlCmd* pC
return false;
}
/* timestamp in tag is not allowed */
for (int32_t i = 0; i < numOfTags; ++i) {
TAOS_FIELD* p = taosArrayGet(pTagsList, i);
//if (p->type == TSDB_DATA_TYPE_TIMESTAMP) {
// invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
// return false;
//}
if (!isValidDataType(p->type)) {
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return false;
@ -2176,7 +2168,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg3 = "illegal column name";
const char* msg4 = "invalid table name";
const char* msg5 = "parameter is out of range [0, 100]";
const char* msg6 = "function applied to tags not allowed";
const char* msg6 = "functions applied to tags are not allowed";
const char* msg7 = "normal table can not apply this function";
const char* msg8 = "multi-columns selection does not support alias column name";
const char* msg9 = "diff/derivative can no be applied to unsigned numeric type";
@ -3089,9 +3081,9 @@ void tscRestoreFuncForSTableQuery(SQueryInfo* pQueryInfo) {
}
bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
const char* msg1 = "TWA/Diff/Derivative/Irate not allowed to apply to super table directly";
const char* msg1 = "TWA/Diff/Derivative/Irate are not allowed to apply to super table directly";
const char* msg2 = "TWA/Diff/Derivative/Irate only support group by tbname for super table query";
const char* msg3 = "function not support for super table query";
const char* msg3 = "functions not support for super table query";
// filter sql function not supported by metric query yet.
size_t size = tscNumOfExprs(pQueryInfo);
@ -3196,34 +3188,42 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
const char* msg2 = "invalid column name in group by clause";
const char* msg3 = "columns from one table allowed as group by columns";
const char* msg4 = "join query does not support group by";
const char* msg5 = "not allowed column type for group by";
const char* msg6 = "tags not allowed for table query";
const char* msg7 = "not support group by expression";
const char* msg8 = "not allowed column type for group by";
const char* msg9 = "tags not allowed for table query";
const char* msg8 = "normal column can only locate at the end of group by clause";
// todo : handle two tables situation
STableMetaInfo* pTableMetaInfo = NULL;
if (pList == NULL) {
return TSDB_CODE_SUCCESS;
}
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
pQueryInfo->groupbyExpr.numOfGroupCols = (int16_t)taosArrayGetSize(pList);
if (pQueryInfo->groupbyExpr.numOfGroupCols > TSDB_MAX_TAGS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
if (pQueryInfo->numOfTables > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
STableMeta* pTableMeta = NULL;
SSchema* pSchema = NULL;
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
if (pGroupExpr->columnInfo == NULL || pQueryInfo->colList == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
pGroupExpr->numOfGroupCols = (int16_t)taosArrayGetSize(pList);
if (pGroupExpr->numOfGroupCols > TSDB_MAX_TAGS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
SSchema *pSchema = NULL;
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
int32_t numOfGroupCols = 0;
size_t num = taosArrayGetSize(pList);
for (int32_t i = 0; i < num; ++i) {
@ -3244,28 +3244,20 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
}
pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
pTableMeta = pTableMetaInfo->pTableMeta;
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pSchema = tGetTbnameColumnSchema();
} else {
pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
}
bool groupTag = false;
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
groupTag = true;
}
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
bool groupTag = (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols);
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
if (pGroupExpr->columnInfo == NULL) {
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
}
if (groupTag) {
if (!UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
int32_t relIndex = index.columnIndex;
@ -3282,7 +3274,7 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
} else {
// check if the column type is valid, here only support the bool/tinyint/smallint/bigint group by
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
@ -3292,10 +3284,20 @@ int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
numOfGroupCols++;
}
}
if (i == 0 && num > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
// 1. only one normal column allowed in the group by clause
// 2. the normal column in the group by clause can only located in the end position
if (numOfGroupCols > 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
for(int32_t i = 0; i < num; ++i) {
SColIndex* pIndex = taosArrayGet(pGroupExpr->columnInfo, i);
if (TSDB_COL_IS_NORMAL_COL(pIndex->flag) && i != num - 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
}
}
@ -3341,6 +3343,7 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
if (pRight->flags & (1 << EXPR_FLAG_NS_TIMESTAMP)) {
pRight->value.i64 =
convertTimePrecision(pRight->value.i64, TSDB_TIME_PRECISION_NANO, timePrecision);
pRight->flags &= ~(1 << EXPR_FLAG_NS_TIMESTAMP);
}
}
@ -4905,6 +4908,7 @@ int32_t getTimeRange(STimeWindow* win, tSqlExpr* pRight, int32_t optr, int16_t t
*/
if (pRight->flags & (1 << EXPR_FLAG_NS_TIMESTAMP)) {
pRight->value.i64 = convertTimePrecision(pRight->value.i64, TSDB_TIME_PRECISION_NANO, timePrecision);
pRight->flags &= ~(1 << EXPR_FLAG_NS_TIMESTAMP);
}
tVariantDump(&pRight->value, (char*)&val, TSDB_DATA_TYPE_BIGINT, true);
@ -5980,8 +5984,8 @@ int32_t validateLimitNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlN
if (tscOrderedProjectionQueryOnSTable(pQueryInfo, 0)) {
/*
* the offset value should be removed during retrieve data from virtual node, since the
* global order are done in client side, so the offset is applied at the client side
* The offset value should be removed during retrieve data from virtual node, since the
* global order are done at the client side, so the offset is applied at the client side.
* However, note that the maximum allowed number of result for each table should be less
* than or equal to the value of limit.
*/
@ -6310,7 +6314,7 @@ static void updateTagPrjFunction(SQueryInfo* pQueryInfo) {
*/
static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg3 = "aggregation function should not be mixed up with projection";
const char* msg2 = "aggregation function should not be mixed up with projection";
bool tagTsColExists = false;
int16_t numOfSelectivity = 0;
@ -6389,7 +6393,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
} else {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) {
if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
return invalidOperationMsg(msg, msg3);
return invalidOperationMsg(msg, msg2);
}
if (numOfAggregation > 0 || numOfSelectivity > 0) {
@ -7841,18 +7845,19 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->numOfTables = 0;
// parse the subquery in the first place
int32_t numOfSub = (int32_t) taosArrayGetSize(pSqlNode->from->list);
for(int32_t i = 0; i < numOfSub; ++i) {
int32_t numOfSub = (int32_t)taosArrayGetSize(pSqlNode->from->list);
for (int32_t i = 0; i < numOfSub; ++i) {
code = doValidateSubquery(pSqlNode, i, pSql, pQueryInfo, tscGetErrorMsgPayload(pCmd));
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
int32_t timeWindowQuery =
(TPARSER_HAS_TOKEN(pSqlNode->interval.interval) || TPARSER_HAS_TOKEN(pSqlNode->sessionVal.gap));
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) != TSDB_CODE_SUCCESS) {
if (validateSelectNodeList(pCmd, pQueryInfo, pSqlNode->pSelNodeList, false, false, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
@ -7862,12 +7867,12 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
// todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
for (int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT || f == TSDB_FUNC_INTERP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
}
if ((timeWindowQuery || pQueryInfo->stateWindow) && f == TSDB_FUNC_LAST) {
pExpr->base.numOfParams = 1;
@ -7876,22 +7881,19 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
}
// todo derivative function requires ts column exists in subquery
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, 0);
int32_t numOfExprs = (int32_t) tscNumOfExprs(pQueryInfo);
if (numOfExprs == 1) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
} else {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 1);
int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) && pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
if (pSchema->type != TSDB_DATA_TYPE_TIMESTAMP) {
int32_t numOfExprs = (int32_t)tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < numOfExprs; ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_DERIVATIVE || f == TSDB_FUNC_TWA || f == TSDB_FUNC_IRATE) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
}
}
}
@ -7913,6 +7915,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isTimeWindowQuery(pQueryInfo)) {
// check if the first column of the nest query result is timestamp column
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0);
@ -7926,6 +7929,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
}
// parse the having clause in the first place
int32_t joinQuery = (pSqlNode->from != NULL && taosArrayGetSize(pSqlNode->from->list) > 1);
if (validateHavingClause(pQueryInfo, pSqlNode->pHaving, pCmd, pSqlNode->pSelNodeList, joinQuery, timeWindowQuery) !=
TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// set order by info
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMeta)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;

View File

@ -484,8 +484,8 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
}
if (shouldFree) { // in case of table-meta/vgrouplist query, automatically free it
taosRemoveRef(tscObjRef, handle);
tscDebug("0x%"PRIx64" sqlObj is automatically freed", pSql->self);
taosRemoveRef(tscObjRef, handle);
}
taosReleaseRef(tscObjRef, handle);
@ -1703,7 +1703,7 @@ int tscProcessRetrieveGlobalMergeRsp(SSqlObj *pSql) {
uint64_t localQueryId = pSql->self;
qTableQuery(pQueryInfo->pQInfo, &localQueryId);
convertQueryResult(pRes, pQueryInfo, pSql->self);
convertQueryResult(pRes, pQueryInfo, pSql->self, true);
code = pRes->code;
if (pRes->code == TSDB_CODE_SUCCESS) {

View File

@ -645,7 +645,7 @@ void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
}
}
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock) {
void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBlock, bool convertNchar) {
assert(pRes->numOfCols > 0);
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
@ -678,7 +678,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
}
}
} else if (pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
} else if (convertNchar && pInfo->field.type == TSDB_DATA_TYPE_NCHAR) {
// convert unicode to native code in a temporary buffer extra one byte for terminated symbol
pRes->buffer[i] = realloc(pRes->buffer[i], pInfo->field.bytes * pRes->numOfRows);
@ -1075,14 +1075,14 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUp
return pOperator;
}
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId) {
void convertQueryResult(SSqlRes* pRes, SQueryInfo* pQueryInfo, uint64_t objId, bool convertNchar) {
// set the correct result
SSDataBlock* p = pQueryInfo->pQInfo->runtimeEnv.outputBuf;
pRes->numOfRows = (p != NULL)? p->info.rows: 0;
if (pRes->code == TSDB_CODE_SUCCESS && pRes->numOfRows > 0) {
tscCreateResPointerInfo(pRes, pQueryInfo);
tscSetResRawPtrRv(pRes, pQueryInfo, p);
tscSetResRawPtrRv(pRes, pQueryInfo, p, convertNchar);
}
tscDebug("0x%"PRIx64" retrieve result in pRes, numOfRows:%d", objId, pRes->numOfRows);
@ -1202,7 +1202,7 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
uint64_t qId = pSql->self;
qTableQuery(px->pQInfo, &qId);
convertQueryResult(pOutput, px, pSql->self);
convertQueryResult(pOutput, px, pSql->self, false);
}
static void tscDestroyResPointerInfo(SSqlRes* pRes) {
@ -2941,6 +2941,7 @@ int32_t tscQueryInfoCopy(SQueryInfo* pQueryInfo, const SQueryInfo* pSrc) {
pQueryInfo->pTableMetaInfo = NULL;
pQueryInfo->bufLen = pSrc->bufLen;
pQueryInfo->orderProjectQuery = pSrc->orderProjectQuery;
pQueryInfo->buf = malloc(pSrc->bufLen);
if (pQueryInfo->buf == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
@ -3526,6 +3527,7 @@ void executeQuery(SSqlObj* pSql, SQueryInfo* pQueryInfo) {
pSql->pSubs = calloc(pSql->subState.numOfSub, POINTER_BYTES);
pSql->subState.states = calloc(pSql->subState.numOfSub, sizeof(int8_t));
pthread_mutex_init(&pSql->subState.mutex, NULL);
for(int32_t i = 0; i < pSql->subState.numOfSub; ++i) {
SQueryInfo* pSub = taosArrayGetP(pQueryInfo->pUpstream, i);

View File

@ -1,5 +1,6 @@
#include <gtest/gtest.h>
#include <iostream>
#include <inttypes.h>
#include "taos.h"
#include "tglobal.h"
@ -132,7 +133,7 @@ void validateResultFields() {
taos_free_result(res);
char sql[512] = {0};
sprintf(sql, "insert into t1 values(%ld, 99, 'abc', 'test')", start_ts);
sprintf(sql, "insert into t1 values(%" PRId64 ", 99, 'abc', 'test')", start_ts);
res = taos_query(conn, sql);
ASSERT_EQ(taos_errno(res), 0);

View File

@ -1,8 +1,10 @@
#include "os.h"
#include <gtest/gtest.h>
#include <cassert>
#include <iostream>
#include <inttypes.h>
#include "os.h"
#include "taos.h"
#include "ttoken.h"
#include "tutil.h"
@ -15,10 +17,10 @@ int main(int argc, char** argv) {
extern void deltaToUtcInitOnce();
/* test parse time function */
TEST(testCase, parse_time) {
taos_options(TSDB_OPTION_TIMEZONE, "GMT-8");
deltaToUtcInitOnce();
char t1[] = "2018-1-1 1:1:1.952798";
char t13[] = "1970-1-1 0:0:0";
@ -77,15 +79,15 @@ TEST(testCase, parse_time) {
taosParseTime(t12, &time1, strlen(t12), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, time1);
taos_options(TSDB_OPTION_TIMEZONE, "UTC");
taos_options(TSDB_OPTION_TIMEZONE, "UTC");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 0);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
char t14[] = "1970-1-1T0:0:0Z";
taosParseTime(t14, &time, strlen(t14), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 0);
@ -135,7 +137,7 @@ TEST(testCase, parse_time) {
//======================== add some case ============================//
char b1[] = "9999-12-31 23:59:59.999";
taosParseTime(b1, &time, strlen(b1), TSDB_TIME_PRECISION_MILLI,0);
EXPECT_EQ(time, 253402271999999);
@ -145,27 +147,27 @@ TEST(testCase, parse_time) {
taosParseTime(b2, &time, strlen(b2), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 1577811661321);
taos_options(TSDB_OPTION_TIMEZONE, "America/New_York");
taos_options(TSDB_OPTION_TIMEZONE, "America/New_York");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, 18000 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Tokyo");
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Tokyo");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, -32400 * MILLISECOND_PER_SECOND);
taos_options(TSDB_OPTION_TIMEZONE, "Asia/Shanghai");
deltaToUtcInitOnce();
taosParseTime(t13, &time, strlen(t13), TSDB_TIME_PRECISION_MILLI, 0);
EXPECT_EQ(time, -28800 * MILLISECOND_PER_SECOND);
char t[] = "2021-01-08T02:11:40.000+00:00";
taosParseTime(t, &time, strlen(t), TSDB_TIME_PRECISION_MILLI, 0);
printf("%ld\n", time);
printf("%" PRId64 "\n", time);
}

View File

@ -155,7 +155,7 @@ extern char tsMnodeTmpDir[];
extern char tsDataDir[];
extern char tsLogDir[];
extern char tsScriptDir[];
extern int64_t tsMsPerDay[3];
extern int64_t tsTickPerDay[3];
// system info
extern char tsOsName[];

View File

@ -84,8 +84,8 @@ int32_t tsMaxNumOfOrderedResults = 100000;
// 10 ms for sliding time, the value will changed in case of time precision changed
int32_t tsMinSlidingTime = 10;
// 10 ms for interval time range, changed accordingly
int32_t tsMinIntervalTime = 10;
// 1 us for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
@ -204,7 +204,7 @@ SDiskCfg tsDiskCfg[TSDB_MAX_DISKS];
* TSDB_TIME_PRECISION_MICRO: 86400000000L
* TSDB_TIME_PRECISION_NANO: 86400000000000L
*/
int64_t tsMsPerDay[] = {86400000L, 86400000000L, 86400000000000L};
int64_t tsTickPerDay[] = {86400000L, 86400000000L, 86400000000000L};
// system info
char tsOsName[10] = "Linux";

View File

@ -126,28 +126,18 @@ function convertDouble(data, num_of_rows, nbytes = 0, offset = 0, micro = false)
}
return res;
}
function convertBinary(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let currOffset = 0;
while (currOffset < data.length) {
let dataEntry = data.slice(currOffset, currOffset + nbytes);
if (dataEntry[0] == FieldTypes.C_BINARY_NULL) {
res.push(null);
}
else {
res.push(ref.readCString(dataEntry));
}
currOffset += nbytes;
}
return res;
}
function convertNchar(data, num_of_rows, nbytes = 0, offset = 0, micro = false) {
data = ref.reinterpret(data.deref(), nbytes * num_of_rows, offset);
let res = [];
let dataEntry = data.slice(0, nbytes); //one entry in a row under a column;
//TODO: should use the correct character encoding
res.push(dataEntry.toString("utf-8"));
let currOffset = 0;
while (currOffset < data.length) {
let len = data.readIntLE(currOffset, 2);
let dataEntry = data.slice(currOffset + 2, currOffset + len + 2); //one entry in a row under a column;
res.push(dataEntry.toString("utf-8"));
currOffset += nbytes;
}
return res;
}
@ -160,7 +150,7 @@ let convertFunctions = {
[FieldTypes.C_BIGINT]: convertBigint,
[FieldTypes.C_FLOAT]: convertFloat,
[FieldTypes.C_DOUBLE]: convertDouble,
[FieldTypes.C_BINARY]: convertBinary,
[FieldTypes.C_BINARY]: convertNchar,
[FieldTypes.C_TIMESTAMP]: convertTimestamp,
[FieldTypes.C_NCHAR]: convertNchar
}

View File

@ -1,6 +1,6 @@
{
"name": "td2.0-connector",
"version": "2.0.7",
"version": "2.0.8",
"description": "A Node.js connector for TDengine.",
"main": "tdengine.js",
"directories": {

View File

@ -31,6 +31,7 @@ void bnReset();
int32_t bnAllocVnodes(struct SVgObj *pVgroup);
int32_t bnAlterDnode(struct SDnodeObj *pDnode, int32_t vnodeId, int32_t dnodeId);
int32_t bnDropDnode(struct SDnodeObj *pDnode);
int32_t bnDnodeCanCreateMnode(struct SDnodeObj *pDnode);
#ifdef __cplusplus
}

View File

@ -636,6 +636,9 @@ static FILE * g_fpOfInsertResult = NULL;
#define errorPrint(fmt, ...) \
do { fprintf(stderr, "ERROR: "fmt, __VA_ARGS__); } while(0)
// for strncpy buffer overflow
#define min(a, b) (((a) < (b)) ? (a) : (b))
///////////////////////////////////////////////////
@ -1221,7 +1224,7 @@ static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
//printf("query result:%s\n", temp);
memcpy(databuf + totalLen, temp, len);
totalLen += len;
debugPrint("totalLen: %"PRId64"\n", totalLen);
verbosePrint("%s() LN%d, totalLen: %"PRId64"\n", __func__, __LINE__, totalLen);
}
verbosePrint("%s() LN%d, databuf=%s resultFile=%s\n",
@ -2574,7 +2577,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->tags[tagIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->tags[tagIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->tags[tagIndex].note,
@ -2587,7 +2590,7 @@ static int getSuperTableFromServer(TAOS * taos, char* dbName,
fields[TSDB_DESCRIBE_METRIC_FIELD_INDEX].bytes);
tstrncpy(superTbls->columns[columnIndex].dataType,
(char *)row[TSDB_DESCRIBE_METRIC_TYPE_INDEX],
fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes);
min(15, fields[TSDB_DESCRIBE_METRIC_TYPE_INDEX].bytes));
superTbls->columns[columnIndex].dataLen =
*((int *)row[TSDB_DESCRIBE_METRIC_LENGTH_INDEX]);
tstrncpy(superTbls->columns[columnIndex].note,
@ -7280,7 +7283,7 @@ static void *superSubscribe(void *sarg) {
pThreadInfo->threadID);
}
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
verbosePrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS,
@ -7472,7 +7475,6 @@ static void *specifiedSubscribe(void *sarg) {
}
}
taos_free_result(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]);
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->querySeq], 0);
taos_close(pThreadInfo->taos);
return NULL;

View File

@ -28,13 +28,14 @@
#include <stdio.h>
#include <stdlib.h>
struct timeval startTime;
struct timeval endTime; /* Start and end times */
struct timeval costStart; /*only used for recording the cost*/
double totalCost = 0;
void cost_start();
double cost_end(const char* tag);
int notsame_cnt = 0;
/*
struct timeval startTime;
struct timeval endTime;
struct timeval costStart;
double totalCost = 0;
void cost_start()
{
@ -53,6 +54,7 @@ double cost_end(const char* tag)
printf(" timecost %s : %.3f ms\n", tag, use_ms);
return use_ms;
}
*/
float toFloat(char* buf){
return (float)atoll(buf);
@ -107,9 +109,9 @@ float check_same(float* ft1, float* ft2, int count){
same_cnt ++;
}
if(i < 20){
printf(" i=%d ft1=%.50f diff=%.50f \n", i, ft1[i], ft1[i] - ft2[i]);
printf(" i=%d ft2=%.50f \n", i, ft2[i]);
if(i < 5){
printf(" i=%d ft1=%.40f diff=%.40f \n", i, ft1[i], ft1[i] - ft2[i]);
printf(" i=%d ft2=%.40f \n", i, ft2[i]);
}
}
@ -181,8 +183,8 @@ bool testFile(const char* inFile, char algorithm){
// compare same
float same_rate = check_same(floats, ft2, cnt);
printf("\n ------------------ count:%d TD <%s> ---------------- \n", cnt, algorithm == 2?"TD":"SZ");
printf(" Compress Rate ......... [%.0f%%] \n", rate);
printf("\n ------------------ count:%d <%s> ---------------- \n", cnt, algorithm == 2?"TD":"SZ");
printf(" Compress Rate ......... [%.2f%%] \n", rate);
double speed1 = (cnt*sizeof(float)*1000/1024/1024)/use_ms1;
printf(" Compress Time ......... [%.4fms] speed=%.1f MB/s\n", use_ms1, speed1);
double speed2 = (cnt*sizeof(float)*1000/1024/1024)/use_ms2;
@ -581,7 +583,7 @@ void unitTestFloat() {
// ----------------- main ----------------------
//
int main(int argc, char *argv[]) {
printf("welcome to use taospack tools v1.2 \n");
printf("welcome to use taospack tools v1.2 SZ_SIZE_TYPE=%ld\n", sizeof(size_t));
gOpenLossy = false;
tsLossyInit();

View File

@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tgrant.h"
#include "tbn.h"
#include "tglobal.h"
#include "tconfig.h"
#include "tutil.h"
@ -632,7 +631,8 @@ static int32_t mnodeProcessDnodeStatusMsg(SMnodeMsg *pMsg) {
}
int32_t numOfMnodes = mnodeGetMnodesNum();
if (numOfMnodes < tsNumOfMnodes && numOfMnodes < mnodeGetOnlineDnodesNum() && !pDnode->isMgmt) {
if (numOfMnodes < tsNumOfMnodes && numOfMnodes < mnodeGetOnlineDnodesNum()
&& bnDnodeCanCreateMnode(pDnode)) {
bnNotify();
}

View File

@ -2075,7 +2075,9 @@ static int32_t mnodeDoCreateChildTable(SMnodeMsg *pMsg, int32_t tid) {
} else {
if (pTable->info.type == TSDB_SUPER_TABLE) {
int64_t us = taosGetTimestampUs();
pTable->uid = (us << 24) + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
uint64_t x = (us&0x000000FFFFFFFFFF);
x = x<<24;
pTable->uid = x + ((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));
} else {
pTable->uid = (((uint64_t)pTable->vgId) << 48) + ((((uint64_t)pTable->tid) & ((1ul << 24) - 1ul)) << 24) +
((sdbGetVersion() & ((1ul << 16) - 1ul)) << 8) + (taosRand() & ((1ul << 8) - 1ul));

View File

@ -74,13 +74,14 @@ bool taosGetProcMemory(float *memoryUsedMB) {
return false;
}
ssize_t _bytes = 0;
size_t len;
char * line = NULL;
while (!feof(fp)) {
tfree(line);
len = 0;
getline(&line, &len, fp);
if (line == NULL) {
_bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
break;
}
if (strstr(line, "VmRSS:") != NULL) {
@ -113,8 +114,8 @@ static bool taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
size_t len;
char * line = NULL;
getline(&line, &len, fp);
if (line == NULL) {
ssize_t _bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
uError("read file:%s failed", tsSysCpuFile);
fclose(fp);
return false;
@ -138,8 +139,8 @@ static bool taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
size_t len = 0;
char * line = NULL;
getline(&line, &len, fp);
if (line == NULL) {
ssize_t _bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
uError("read file:%s failed", tsProcCpuFile);
fclose(fp);
return false;
@ -339,6 +340,7 @@ static bool taosGetCardInfo(int64_t *bytes) {
return false;
}
ssize_t _bytes = 0;
size_t len = 2048;
char * line = calloc(1, len);
@ -357,7 +359,12 @@ static bool taosGetCardInfo(int64_t *bytes) {
int64_t nouse6 = 0;
char nouse0[200] = {0};
getline(&line, &len, fp);
_bytes = getline(&line, &len, fp);
if (_bytes < 0)
{
break;
}
line[len - 1] = 0;
if (strstr(line, "lo:") != NULL) {
@ -420,6 +427,7 @@ static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
return false;
}
ssize_t _bytes = 0;
size_t len;
char * line = NULL;
char tmp[10];
@ -428,8 +436,8 @@ static bool taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
while (!feof(fp)) {
tfree(line);
len = 0;
getline(&line, &len, fp);
if (line == NULL) {
_bytes = getline(&line, &len, fp);
if ((_bytes < 0) || (line == NULL)) {
break;
}
if (strstr(line, "rchar:") != NULL) {

View File

@ -348,6 +348,7 @@ int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrec
{1.0 / 1000000, 1.0 / 1000, 1.} };
return (int64_t)((double)time * factors[fromPrecision][toPrecision]);
}
static int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision) {
switch (unit) {

View File

@ -297,6 +297,7 @@ void httpJsonTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
}
default:
fractionLen = 0;
assert(false);
}
@ -342,6 +343,7 @@ void httpJsonUtcTimestamp(JsonBuf* buf, int64_t t, int32_t timePrecision) {
}
default:
fractionLen = 0;
assert(false);
}

View File

@ -118,7 +118,6 @@ typedef struct SQueryInfo {
int64_t vgroupLimit; // table limit in case of super table projection query + global order + limit
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
int32_t round; // 0/1/....
int32_t bufLen;

View File

@ -1575,7 +1575,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
memcpy(pCtx->pOutput, data, pCtx->inputBytes);
TSKEY ts = GET_TS_DATA(pCtx, i);
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
DO_UPDATE_TAG_COLUMNS(pCtx, ts);
pResInfo->hasResult = DATA_SET_FLAG;
@ -1590,7 +1590,7 @@ static void last_function(SQLFunctionCtx *pCtx) {
continue;
}
TSKEY ts = GET_TS_DATA(pCtx, i);
TSKEY ts = pCtx->ptsList ? GET_TS_DATA(pCtx, i) : 0;
char* buf = GET_ROWCELL_INTERBUF(pResInfo);
if (pResInfo->hasResult != DATA_SET_FLAG || (*(TSKEY*)buf) < ts) {

View File

@ -3847,14 +3847,17 @@ int32_t doFillTimeIntervalGapsInResults(SFillInfo* pFillInfo, SSDataBlock *pOutp
}
void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType) {
SQueryProfEvent event;
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
SQueryProfEvent event = {0};
event.eventType = eventType;
event.eventTime = taosGetTimestampUs();
event.operatorType = operatorInfo->operatorType;
SQInfo* qInfo = operatorInfo->pRuntimeEnv->qinfo;
if (qInfo->summary.queryProfEvents) {
taosArrayPush(qInfo->summary.queryProfEvents, &event);
if (operatorInfo->pRuntimeEnv) {
SQInfo* pQInfo = operatorInfo->pRuntimeEnv->qinfo;
if (pQInfo->summary.queryProfEvents) {
taosArrayPush(pQInfo->summary.queryProfEvents, &event);
}
}
}
@ -4314,8 +4317,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
return pFillCol;
}
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner,
SArray* pOperator, void* param) {
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
void* param) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;

View File

@ -350,8 +350,8 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
}
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
*minKey = fid * days * tsMsPerDay[precision];
*maxKey = *minKey + days * tsMsPerDay[precision] - 1;
*minKey = fid * days * tsTickPerDay[precision];
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
}
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {

View File

@ -17,9 +17,9 @@
#define TSDB_MAX_SUBBLOCKS 8
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
if (key < 0) {
return (int)((key + 1) / tsMsPerDay[precision] / days - 1);
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
} else {
return (int)((key / tsMsPerDay[precision] / days));
return (int)((key / tsTickPerDay[precision] / days));
}
}
@ -363,9 +363,9 @@ void tsdbGetRtnSnap(STsdbRepo *pRepo, SRtn *pRtn) {
TSKEY minKey, midKey, maxKey, now;
now = taosGetTimestamp(pCfg->precision);
minKey = now - pCfg->keep * tsMsPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsMsPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsMsPerDay[pCfg->precision];
minKey = now - pCfg->keep * tsTickPerDay[pCfg->precision];
midKey = now - pCfg->keep2 * tsTickPerDay[pCfg->precision];
maxKey = now - pCfg->keep1 * tsTickPerDay[pCfg->precision];
pRtn->minKey = minKey;
pRtn->minFid = (int)(TSDB_KEY_FID(minKey, pCfg->daysPerFile, pCfg->precision));

View File

@ -632,8 +632,8 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
SSubmitBlkIter blkIter = {0};
SDataRow row = NULL;
TSKEY now = taosGetTimestamp(pRepo->config.precision);
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
TSKEY minKey = now - tsTickPerDay[pRepo->config.precision] * pRepo->config.keep;
TSKEY maxKey = now + tsTickPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
terrno = TSDB_CODE_SUCCESS;
pMsg->length = htonl(pMsg->length);

View File

@ -39,6 +39,12 @@ enum {
TSDB_QUERY_TYPE_LAST = 2,
};
enum {
TSDB_CACHED_TYPE_NONE = 0,
TSDB_CACHED_TYPE_LASTROW = 1,
TSDB_CACHED_TYPE_LAST = 2,
};
typedef struct SQueryFilePos {
int32_t fid;
int32_t slot;
@ -280,9 +286,13 @@ static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STa
info.tableId.uid = info.pTableObj->tableId.uid;
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
assert(info.lastKey >= pQueryHandle->window.skey);
if (info.lastKey == INT64_MIN || info.lastKey < pQueryHandle->window.skey) {
info.lastKey = pQueryHandle->window.skey;
}
assert(info.lastKey >= pQueryHandle->window.skey && info.lastKey <= pQueryHandle->window.ekey);
} else {
assert(info.lastKey <= pQueryHandle->window.skey);
assert(info.lastKey >= pQueryHandle->window.ekey && info.lastKey <= pQueryHandle->window.skey);
}
taosArrayPush(pTableCheckInfo, &info);
@ -339,14 +349,57 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s
return pNew;
}
static bool emptyQueryTimewindow(STsdbQueryHandle* pQueryHandle) {
assert(pQueryHandle != NULL);
STimeWindow* w = &pQueryHandle->window;
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
}
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
// the expired data to client, even it is queried already.
static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
STsdbCfg* pCfg = &pTsdb->config;
int64_t now = taosGetTimestamp(pCfg->precision);
return now - (tsTickPerDay[pCfg->precision] * pCfg->keep);
}
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
pQueryHandle->window = pCond->twindow;
bool updateTs = false;
int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
if (startTs > pQueryHandle->window.skey) {
pQueryHandle->window.skey = startTs;
pCond->twindow.skey = startTs;
updateTs = true;
}
} else {
if (startTs > pQueryHandle->window.ekey) {
pQueryHandle->window.ekey = startTs;
pCond->twindow.ekey = startTs;
updateTs = true;
}
}
if (updateTs) {
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64
", 0x%" PRIx64, pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey,
pQueryHandle->window.ekey, pQueryHandle->qId);
}
}
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
STsdbQueryHandle* pQueryHandle = calloc(1, sizeof(STsdbQueryHandle));
if (pQueryHandle == NULL) {
goto out_of_memory;
goto _end;
}
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->cur.fid = INT32_MIN;
@ -354,36 +407,33 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->checkFiles = true;
pQueryHandle->activeIndex = 0; // current active table index
pQueryHandle->qId = qId;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef;
pQueryHandle->loadType = pCond->type;
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
pQueryHandle->currentLoadExternalRows = pCond->loadExternalRows;
pQueryHandle->loadType = pCond->type;
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
goto out_of_memory;
goto _end;
}
assert(pCond != NULL && pMemRef != NULL);
if (ASCENDING_TRAVERSE(pCond->order)) {
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
} else {
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
}
setQueryTimewindow(pQueryHandle, pCond);
if (pCond->numOfCols > 0) {
// allocate buffer in order to load data blocks from file
pQueryHandle->statis = calloc(pCond->numOfCols, sizeof(SDataStatis));
if (pQueryHandle->statis == NULL) {
goto out_of_memory;
goto _end;
}
pQueryHandle->pColumns =
taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData)); // todo: use list instead of array?
// todo: use list instead of array?
pQueryHandle->pColumns = taosArrayInit(pCond->numOfCols, sizeof(SColumnInfoData));
if (pQueryHandle->pColumns == NULL) {
goto out_of_memory;
goto _end;
}
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
@ -392,14 +442,16 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
colInfo.info = pCond->colList[i];
colInfo.pData = calloc(1, EXTRA_BYTES + pQueryHandle->outputCapacity * pCond->colList[i].bytes);
if (colInfo.pData == NULL) {
goto out_of_memory;
goto _end;
}
taosArrayPush(pQueryHandle->pColumns, &colInfo);
pQueryHandle->statis[i].colId = colInfo.info.colId;
}
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
@ -407,7 +459,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
if (pQueryHandle->pDataCols == NULL) {
tsdbError("%p failed to malloc buf for pDataCols, %"PRIu64, pQueryHandle, pQueryHandle->qId);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
goto out_of_memory;
goto _end;
}
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
@ -415,7 +467,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
return (TsdbQueryHandleT) pQueryHandle;
out_of_memory:
_end:
tsdbCleanupQueryHandle(pQueryHandle);
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return NULL;
@ -423,6 +475,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) {
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
if (emptyQueryTimewindow(pQueryHandle)) {
return (TsdbQueryHandleT*) pQueryHandle;
}
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
assert(pMeta != NULL);
@ -446,6 +501,15 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
STsdbQueryHandle* pQueryHandle = queryHandle;
if (emptyQueryTimewindow(pQueryHandle)) {
if (pCond->order != pQueryHandle->order) {
pQueryHandle->order = pCond->order;
SWAP(pQueryHandle->window.skey, pQueryHandle->window.ekey, int64_t);
}
return;
}
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
@ -864,10 +928,10 @@ static int32_t getFileIdFromKey(TSKEY key, int32_t daysPerFile, int32_t precisio
}
if (key < 0) {
key -= (daysPerFile * tsMsPerDay[precision]);
key -= (daysPerFile * tsTickPerDay[precision]);
}
int64_t fid = (int64_t)(key / (daysPerFile * tsMsPerDay[precision])); // set the starting fileId
int64_t fid = (int64_t)(key / (daysPerFile * tsTickPerDay[precision])); // set the starting fileId
if (fid < 0L && llabs(fid) > INT32_MAX) { // data value overflow for INT32
fid = INT32_MIN;
}
@ -1171,8 +1235,9 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
SQueryFilePos* cur = &pQueryHandle->cur;
int32_t code = TSDB_CODE_SUCCESS;
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
if (asc) {
// query ended in/started from current block
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
@ -1193,7 +1258,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
assert(pCheckInfo->lastKey <= pBlock->keyLast);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else { // the whole block is loaded in to buffer
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows - 1);
cur->pos = asc? 0:(pBlock->numOfRows - 1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
} else { //desc order, query ended in current block
@ -1213,7 +1278,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
} else {
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1);
cur->pos = asc? 0:(pBlock->numOfRows-1);
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
}
}
@ -2684,13 +2749,19 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
if (emptyQueryTimewindow(pQueryHandle)) {
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
return false;
}
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
// TODO refactor: remove "type"
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST) {
if (pQueryHandle->cachelastrow == 1) {
if (pQueryHandle->cachelastrow == TSDB_CACHED_TYPE_LASTROW) {
return loadCachedLastRow(pQueryHandle);
} else if (pQueryHandle->cachelastrow == 2) {
} else if (pQueryHandle->cachelastrow == TSDB_CACHED_TYPE_LAST) {
return loadCachedLast(pQueryHandle);
}
}
@ -2896,7 +2967,7 @@ out:
}
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle) {
return ((STsdbQueryHandle *)pQueryHandle)->cachelastrow > 0;
return ((STsdbQueryHandle *)pQueryHandle)->cachelastrow > TSDB_CACHED_TYPE_NONE;
}
int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *groupList) {
@ -2914,9 +2985,9 @@ int32_t checkForCachedLastRow(STsdbQueryHandle* pQueryHandle, STableGroupInfo *g
if (((STable*)pInfo->pTable)->lastRow) {
code = tsdbGetCachedLastRow(pInfo->pTable, NULL, &key);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->cachelastrow = 0;
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_NONE;
} else {
pQueryHandle->cachelastrow = 1;
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LASTROW;
}
}
@ -2936,12 +3007,11 @@ int32_t checkForCachedLast(STsdbQueryHandle* pQueryHandle) {
int32_t code = 0;
if (pQueryHandle->pTsdb && atomic_load_8(&pQueryHandle->pTsdb->hasCachedLastColumn)){
pQueryHandle->cachelastrow = 2;
pQueryHandle->cachelastrow = TSDB_CACHED_TYPE_LAST;
}
// update the tsdb query time range
if (pQueryHandle->cachelastrow) {
pQueryHandle->window = TSWINDOW_INITIALIZER;
pQueryHandle->checkFiles = false;
pQueryHandle->activeIndex = -1; // start from -1
}
@ -3548,7 +3618,6 @@ int32_t tsdbGetOneTableGroup(STsdbRepo* tsdb, uint64_t uid, TSKEY startKey, STab
taosArrayPush(group, &info);
taosArrayPush(pGroupInfo->pGroupList, &group);
return TSDB_CODE_SUCCESS;
_error:
@ -3637,15 +3706,21 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
return;
}
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
taosArrayDestroy(pQueryHandle->defaultLoadColumn);
tfree(pQueryHandle->pDataBlockInfo);
tfree(pQueryHandle->statis);
// todo check error
tsdbMayUnTakeMemSnapshot(pQueryHandle);
if (!emptyQueryTimewindow(pQueryHandle)) {
tsdbMayUnTakeMemSnapshot(pQueryHandle);
} else {
assert(pQueryHandle->pTableCheckInfo == NULL);
}
if (pQueryHandle->pTableCheckInfo != NULL) {
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
}
tsdbDestroyReadH(&pQueryHandle->rhelper);

View File

@ -75,7 +75,9 @@ int tsDecompressDoubleLossyImp(const char * input, int compressedSize, const int
// init
bool tsLossyInit();
void cost_start();
double cost_end(const char* tag);
void show_rate( int in_len, int out_len);
static FORCE_INLINE int tsCompressTinyint(const char *const input, int inputSize, const int nelements, char *const output, int outputSize, char algorithm,
@ -225,11 +227,19 @@ static FORCE_INLINE int tsCompressFloat(const char *const input, int inputSize,
return tsCompressFloatLossyImp(input, nelements, output);
// lossless mode
} else {
if (algorithm == ONE_STAGE_COMP) {
return tsCompressFloatImp(input, nelements, output);
} else if (algorithm == TWO_STAGE_COMP) {
//cost_start();
int len = tsCompressFloatImp(input, nelements, buffer);
return tsCompressStringImp(buffer, len, output, outputSize);
//cost_end(" td_first_compress");
//show_rate(inputSize, len);
//cost_start();
int ret = tsCompressStringImp(buffer, len, output, outputSize);
//cost_end(" td_second_compress");
//show_rate(inputSize, ret);
return ret;
} else {
assert(0);
return -1;
@ -301,11 +311,26 @@ static FORCE_INLINE int tsDecompressDouble(const char *const input, int compress
static FORCE_INLINE int tsCompressFloatLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,
char algorithm, char *const buffer, int bufferSize) {
return tsCompressFloatLossyImp(input, nelements, output);
/*
cost_start();
int len = tsCompressFloatLossyImp(input, nelements, buffer);
cost_end(" sz1_first_compress");
show_rate(inputSize, len);
cost_start();
int ret = tsCompressStringImp(buffer, len, output, outputSize);
cost_end(" sz1_second_compress");
show_rate(inputSize, ret);
return ret;
*/
}
static FORCE_INLINE int tsDecompressFloatLossy(const char *const input, int compressedSize, const int nelements, char *const output,
int outputSize, char algorithm, char *const buffer, int bufferSize){
return tsDecompressFloatLossyImp(input, compressedSize, nelements, output);
//int outSize = tsDecompressStringImp(input, compressedSize, buffer, bufferSize);
//return tsDecompressFloatLossyImp(buffer, outSize, nelements, output);
}
static FORCE_INLINE int tsCompressDoubleLossy(const char *const input, int inputSize, const int nelements, char *const output, int outputSize,

View File

@ -18,6 +18,24 @@
#include "exception.h"
#include "taoserror.h"
typedef union Un4B {
uint32_t ui;
float f;
} Un4B;
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(Un4B) == sizeof(uint32_t), "sizeof(Un4B) must equal to sizeof(uint32_t)");
static_assert(sizeof(Un4B) == sizeof(float), "sizeof(Un4B) must equal to sizeof(float)");
#endif
typedef union Un8B {
uint64_t ull;
double d;
} Un8B;
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(Un8B) == sizeof(uint64_t), "sizeof(Un8B) must equal to sizeof(uint64_t)");
static_assert(sizeof(Un8B) == sizeof(double), "sizeof(Un8B) must equal to sizeof(double)");
#endif
////////////////////////////////////////////////////////////////////////////////
// reader functions
@ -175,13 +193,21 @@ uint64_t tbufReadUint64( SBufferReader* buf ) {
}
float tbufReadFloat( SBufferReader* buf ) {
uint32_t ret = tbufReadUint32( buf );
return *(float*)( &ret );
Un4B _un;
tbufReadToBuffer( buf, &_un, sizeof(_un) );
if( buf->endian ) {
_un.ui = ntohl( _un.ui );
}
return _un.f;
}
double tbufReadDouble(SBufferReader* buf) {
uint64_t ret = tbufReadUint64( buf );
return *(double*)( &ret );
Un8B _un;
tbufReadToBuffer( buf, &_un, sizeof(_un) );
if( buf->endian ) {
_un.ull = htobe64( _un.ull );
}
return _un.d;
}
////////////////////////////////////////////////////////////////////////////////
@ -381,17 +407,37 @@ void tbufWriteUint64At( SBufferWriter* buf, size_t pos, uint64_t data ) {
}
void tbufWriteFloat( SBufferWriter* buf, float data ) {
tbufWriteUint32( buf, *(uint32_t*)(&data) );
Un4B _un;
_un.f = data;
if( buf->endian ) {
_un.ui = htonl( _un.ui );
}
tbufWrite( buf, &_un, sizeof(_un) );
}
void tbufWriteFloatAt( SBufferWriter* buf, size_t pos, float data ) {
tbufWriteUint32At( buf, pos, *(uint32_t*)(&data) );
Un4B _un;
_un.f = data;
if( buf->endian ) {
_un.ui = htonl( _un.ui );
}
tbufWriteAt( buf, pos, &_un, sizeof(_un) );
}
void tbufWriteDouble( SBufferWriter* buf, double data ) {
tbufWriteUint64( buf, *(uint64_t*)(&data) );
Un8B _un;
_un.d = data;
if( buf->endian ) {
_un.ull = htobe64( _un.ull );
}
tbufWrite( buf, &_un, sizeof(_un) );
}
void tbufWriteDoubleAt( SBufferWriter* buf, size_t pos, double data ) {
tbufWriteUint64At( buf, pos, *(uint64_t*)(&data) );
Un8B _un;
_un.d = data;
if( buf->endian ) {
_un.ull = htobe64( _un.ull );
}
tbufWriteAt( buf, pos, &_un, sizeof(_un) );
}

View File

@ -327,7 +327,8 @@ void taosReadGlobalLogCfg() {
printf("\nconfig file:%s not found, all variables are set to default\n", fileName);
return;
}
ssize_t _bytes = 0;
size_t len = 1024;
line = calloc(1, len);
@ -337,7 +338,12 @@ void taosReadGlobalLogCfg() {
option = value = NULL;
olen = vlen = 0;
tgetline(&line, &len, fp);
_bytes = tgetline(&line, &len, fp);
if (_bytes < 0)
{
break;
}
line[len - 1] = 0;
paGetToken(line, &option, &olen);
@ -373,7 +379,8 @@ bool taosReadGlobalCfg() {
return false;
}
}
ssize_t _bytes = 0;
size_t len = 1024;
line = calloc(1, len);
@ -383,7 +390,12 @@ bool taosReadGlobalCfg() {
option = value = value2 = value3 = NULL;
olen = vlen = vlen2 = vlen3 = 0;
tgetline(&line, &len, fp);
_bytes = tgetline(&line, &len, fp);
if (_bytes < 0)
{
break;
}
line[len - 1] = 0;
paGetToken(line, &option, &olen);

View File

@ -427,13 +427,23 @@ char *taosIpStr(uint32_t ipInt) {
}
FORCE_INLINE float taos_align_get_float(const char* pBuf) {
float fv = 0;
*(int32_t*)(&fv) = *(int32_t*)pBuf;
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(float) == sizeof(uint32_t), "sizeof(float) must equal to sizeof(uint32_t)");
#else
assert(sizeof(float) == sizeof(uint32_t));
#endif
float fv = 0;
memcpy(&fv, pBuf, sizeof(fv)); // in ARM, return *((const float*)(pBuf)) may cause problem
return fv;
}
FORCE_INLINE double taos_align_get_double(const char* pBuf) {
double dv = 0;
*(int64_t*)(&dv) = *(int64_t*)pBuf;
#if __STDC_VERSION__ >= 201112L
static_assert(sizeof(double) == sizeof(uint64_t), "sizeof(double) must equal to sizeof(uint64_t)");
#else
assert(sizeof(double) == sizeof(uint64_t));
#endif
double dv = 0;
memcpy(&dv, pBuf, sizeof(dv)); // in ARM, return *((const double*)(pBuf)) may cause problem
return dv;
}

View File

@ -19,6 +19,7 @@
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tfile.h"
int64_t ver = 0;
void *pWal = NULL;
@ -36,7 +37,7 @@ int writeToQueue(void *pVnode, void *data, int type, void *pMsg) {
}
int main(int argc, char *argv[]) {
char path[128] = "/home/jhtao/test/wal";
char path[128] = "/tmp/wal";
int level = 2;
int total = 5;
int rows = 10000;
@ -72,9 +73,11 @@ int main(int argc, char *argv[]) {
printf(" [-h help]: print out this help\n\n");
exit(0);
}
}
}
taosInitLog("wal.log", 100000, 10);
tfInit();
walInit();
SWalCfg walCfg = {0};
walCfg.walLevel = level;
@ -122,13 +125,13 @@ int main(int argc, char *argv[]) {
printf("index:%" PRId64 " wal:%s\n", index, name);
if (code == 0) break;
index++;
}
getchar();
walClose(pWal);
walCleanUp();
tfCleanup();
return 0;
}

12
tests/Jenkinsfile vendored
View File

@ -110,16 +110,8 @@ pipeline {
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
mvn clean package assembly:single -DskipTests >/dev/null
java -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {
sh '''
cd ${WKC}/src/connector/jdbc
mvn clean package -Dmaven.test.skip=true >/dev/null
cd ${WKC}/tests/examples/JDBC/JDBCDemo/
java --class-path=../../../../src/connector/jdbc/target:$JAVA_HOME/jre/lib/ext -jar target/JDBCDemo-SNAPSHOT-jar-with-dependencies.jar -host 127.0.0.1
mvn clean package >/dev/null
java -jar target/JdbcRestfulDemo-jar-with-dependencies.jar
'''
}
catchError(buildResult: 'SUCCESS', stageResult: 'FAILURE') {

View File

@ -173,8 +173,9 @@ class TDTestCase:
tdSql.checkData(0,7,'10,10,10')
tdSql.error('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb')
tdSql.checkRows(rowNum)
tdSql.checkRows(2)
rowNum = 2
tdLog.notice('testing keep will be altered if sudden change from small to big')
for i in range(30):
tdSql.execute('alter database db keep 14,14,14')
@ -182,14 +183,19 @@ class TDTestCase:
tdSql.execute('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb')
rowNum += 1
tdSql.checkRows(rowNum )
tdSql.checkRows(rowNum)
tdLog.notice('testing keep will be altered if sudden change from big to small')
tdSql.execute('alter database db keep 16,16,16')
tdSql.execute('alter database db keep 14,14,14')
tdSql.error('insert into tb values (now-15d, 10)')
tdSql.query('select * from tb')
tdSql.checkRows(rowNum)
tdSql.checkRows(2)
tdLog.notice('testing data will show up again when keep is being changed to large value')
tdSql.execute('alter database db keep 40,40,40')
tdSql.query('select * from tb')
tdSql.checkRows(63)

View File

@ -17722,4 +17722,24 @@
fun:_PyEval_EvalCodeWithName
fun:_PyFunction_Vectorcall
fun:_PyEval_EvalFrameDefault
}
{
<insert_a_suppression_name_here>
Memcheck:Leak
match-leak-kinds: definite
fun:malloc
fun:__libc_alloc_buffer_allocate
fun:alloc_buffer_allocate
fun:__resolv_conf_allocate
fun:__resolv_conf_load
fun:__resolv_conf_get_current
fun:__res_vinit
fun:maybe_init
fun:context_get
fun:__resolv_context_get
fun:gaih_inet.constprop.7
fun:getaddrinfo
fun:taosGetFqdn
fun:taosCheckGlobalCfg
fun:taos_init_imp
}

View File

@ -12,9 +12,6 @@
# -*- coding: utf-8 -*-
from basic import *
from util.sql import tdSql
class TDTestCase:
@ -36,4 +33,6 @@ td = TDTestCase()
td.init()
## usage: python3 OneMnodeMultipleVnodesTest.py

View File

@ -44,7 +44,16 @@ class BuildDockerCluser:
"jnidebugFlag":"135",
"qdebugFlag":"135",
"maxSQLLength":"1048576"
}
}
cmd = "mkdir -p %s" % self.dockerDir
self.execCmd(cmd)
cmd = "cp *.yml %s" % self.dockerDir
self.execCmd(cmd)
cmd = "cp Dockerfile %s" % self.dockerDir
self.execCmd(cmd)
# execute command, and return the output
# ref: https://blog.csdn.net/wowocpp/article/details/80775650
@ -81,7 +90,7 @@ class BuildDockerCluser:
def removeFile(self, rootDir, index, dir):
cmd = "rm -rf %s/node%d/%s/*" % (rootDir, index, dir)
self.execCmd(cmd)
def clearEnv(self):
cmd = "cd %s && docker-compose down --remove-orphans" % self.dockerDir
self.execCmd(cmd)
@ -108,10 +117,14 @@ class BuildDockerCluser:
self.execCmd(cmd)
def updateLocalhosts(self):
cmd = "grep '172.27.0.7 *tdnode1' /etc/hosts"
cmd = "grep '172.27.0.7 *tdnode1' /etc/hosts | sed 's: ::g'"
result = self.execCmdAndGetOutput(cmd)
if result and not result.isspace():
print(result)
if result is None or result.isspace():
print("==========")
cmd = "echo '172.27.0.7 tdnode1' >> /etc/hosts"
display = "echo %s" % cmd
self.execCmd(display)
self.execCmd(cmd)
def deploy(self):
@ -138,13 +151,13 @@ class BuildDockerCluser:
if self.numOfNodes < 2 or self.numOfNodes > 10:
print("the number of nodes must be between 2 and 10")
exit(0)
self.clearEnv()
self.createDirs()
self.updateLocalhosts()
self.deploy()
def run(self):
cmd = "./buildClusterEnv.sh -n %d -v %s -d %s" % (self.numOfNodes, self.getTaosdVersion(), self.dockerDir)
cmd = "./buildClusterEnv.sh -n %d -v %s -d %s" % (self.numOfNodes, self.getTaosdVersion(), self.dockerDir)
display = "echo %s" % cmd
self.execCmd(display)
self.execCmd(cmd)
self.getConnection()
self.createDondes()

View File

@ -334,6 +334,7 @@ python3 ./test.py -f insert/unsignedInt.py
python3 ./test.py -f insert/unsignedBigint.py
python3 ./test.py -f insert/unsignedSmallint.py
python3 ./test.py -f insert/unsignedTinyint.py
python3 ./test.py -f insert/insertFromCSV.py
python3 ./test.py -f query/filterAllUnsignedIntTypes.py
python3 ./test.py -f tag_lite/unsignedInt.py

View File

@ -28,16 +28,15 @@ class TDTestCase:
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1500074556514
self.ts = 1500074556514
self.csvfile = "/tmp/csvfile.csv"
self.rows = 100000
def writeCSV(self):
with open('test3.csv','w', encoding='utf-8', newline='') as csvFile:
with open(self.csvfile, 'w', encoding='utf-8', newline='') as csvFile:
writer = csv.writer(csvFile, dialect='excel')
for i in range(1000000):
newTimestamp = self.ts + random.randint(10000000, 10000000000) + random.randint(1000, 10000000) + random.randint(1, 1000)
d = datetime.datetime.fromtimestamp(newTimestamp / 1000)
dt = str(d.strftime("%Y-%m-%d %H:%M:%S.%f"))
writer.writerow(["'%s'" % dt, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
for i in range(self.rows):
writer.writerow([self.ts + i, random.randint(1, 100), random.uniform(1, 100), random.randint(1, 100), random.randint(1, 100)])
def removCSVHeader(self):
data = pd.read_csv("ordered.csv")
@ -45,23 +44,25 @@ class TDTestCase:
data.to_csv("ordered.csv", header = False, index = False)
def run(self):
self.writeCSV()
tdSql.prepare()
tdSql.execute("create table t1(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
tdSql.execute("insert into t1 file 'outoforder.csv'")
tdSql.execute("insert into t1 file '%s'" % self.csvfile)
duration = time.time() - startTime
print("Out of Order - Insert time: %d" % duration)
tdSql.query("select count(*) from t1")
rows = tdSql.getData(0, 0)
print("Insert time: %d" % duration)
tdSql.query("select * from t1")
tdSql.checkRows(self.rows)
tdSql.execute("create table t2(ts timestamp, c1 int, c2 float, c3 int, c4 int)")
startTime = time.time()
tdSql.execute("insert into t2 file 'ordered.csv'")
duration = time.time() - startTime
print("Ordered - Insert time: %d" % duration)
tdSql.query("select count(*) from t2")
tdSql.checkData(0,0, rows)
tdSql.execute("create table stb(ts timestamp, c1 int, c2 float, c3 int, c4 int) tags(t1 int, t2 binary(20))")
tdSql.execute("insert into t2 using stb(t1) tags(1) file '%s'" % self.csvfile)
tdSql.query("select * from stb")
tdSql.checkRows(self.rows)
tdSql.execute("insert into t3 using stb tags(1, 'test') file '%s'" % self.csvfile)
tdSql.query("select * from stb")
tdSql.checkRows(self.rows * 2)
def stop(self):
tdSql.close()

View File

@ -71,13 +71,10 @@ class TDTestRetetion:
tdDnodes.start(1)
tdLog.info(cmd)
ttime = datetime.datetime.now()
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
if self.queryRows==4:
self.checkRows(4,cmd)
return 0
else:
self.checkRows(5,cmd)
self.checkRows(3,cmd)
tdLog.info("=============== step3")
tdDnodes.stop(1)
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=48)))
@ -92,7 +89,7 @@ class TDTestRetetion:
tdLog.info(cmd)
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
self.checkRows(6,cmd)
self.checkRows(3,cmd)
tdLog.info("=============== step4")
tdDnodes.stop(1)
tdDnodes.start(1)
@ -100,7 +97,7 @@ class TDTestRetetion:
tdLog.info(cmd)
tdSql.execute(cmd)
self.queryRows=tdSql.query('select * from test')
self.checkRows(5,cmd)
self.checkRows(4,cmd)
tdLog.info("=============== step5")
tdDnodes.stop(1)
@ -109,6 +106,22 @@ class TDTestRetetion:
self.queryRows=tdSql.query('select * from test where ts > now-1d')
self.checkRows(2,cmd)
tdLog.info("=============== step6")
tdDnodes.stop(1)
os.system("date -s '%s'"%(ttime + datetime.timedelta(seconds=(72*60*60-7))))
tdDnodes.start(1)
while datetime.datetime.now() < (ttime + datetime.timedelta(seconds=(72*60*60-1))):
time.sleep(0.001)
cmd = 'select * from test'
self.queryRows=tdSql.query(cmd)
self.checkRows(4,cmd)
while datetime.datetime.now() <= (ttime + datetime.timedelta(hours=72)):
time.sleep(0.001)
cmd = 'select * from test'
self.queryRows=tdSql.query(cmd)
print(tdSql.queryResult)
self.checkRows(3,cmd)
def stop(self):
os.system("sudo timedatectl set-ntp true")
os.system("date -s '%s'"%(datetime.datetime.now()+datetime.timedelta(hours=1)))

View File

@ -127,6 +127,7 @@ class TDDnode:
"anyIp":"0",
"tsEnableTelemetryReporting":"0",
"dDebugFlag":"135",
"tsdbDebugFlag":"135",
"mDebugFlag":"135",
"sdbDebugFlag":"135",
"rpcDebugFlag":"135",

View File

@ -959,14 +959,14 @@ endi
if $data31 != 9.000000000 then
return -1
endi
if $data41 != null then
if $data41 != NULL then
print ===== $data41
return -1
endi
if $data51 != 16.000000000 then
return -1
endi
if $data61 != null then
if $data61 != NULL then
print ===== $data61
return -1
endi

View File

@ -781,4 +781,14 @@ if $data11 != 2 then
return -1
endi
sql_error select count(*) from m1 group by tbname,k,f1;
sql_error select count(*) from m1 group by tbname,k,a;
sql_error select count(*) from m1 group by k, tbname;
sql_error select count(*) from m1 group by k,f1;
sql_error select count(*) from tm0 group by tbname;
sql_error select count(*) from tm0 group by a;
sql_error select count(*) from tm0 group by k,f1;
sql_error select count(*),f1 from m1 group by tbname,k;
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,6 +1,6 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 4
system sh/exec.sh -n dnode1 -s start

View File

@ -82,5 +82,9 @@ endi
if $data01 != NULL then
return -1
endi
sql select last_row(*) from (select f from lr_nested)
if $rows != 1 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -179,14 +179,6 @@ if $data21 != 49.500000000 then
return -1
endi
#define TSDB_FUNC_APERCT 7
#define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15
#define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql_error select interp(c1) from (select * from nest_tb0);
@ -197,9 +189,90 @@ sql_error select diff(c1), twa(c1) from (select * from nest_tb0);
sql_error select irate(c1), interp(c1), twa(c1) from (select * from nest_tb0);
sql select apercentile(c1, 50) from (select * from nest_tb0) interval(1d)
if $rows != 7 then
return -1
endi
if $data00 != @20-09-15 00:00:00.000@ then
return -1
endi
if $data01 != 47.571428571 then
return -1
endi
if $data10 != @20-09-16 00:00:00.000@ then
return -1
endi
if $data11 != 49.666666667 then
return -1
endi
if $data20 != @20-09-17 00:00:00.000@ then
return -1
endi
if $data21 != 49.000000000 then
return -1
endi
if $data30 != @20-09-18 00:00:00.000@ then
return -1
endi
if $data31 != 48.333333333 then
return -1
endi
sql select twa(c1) from (select * from nest_tb0);
if $rows != 1 then
return -1
endi
if $data00 != 49.500000000 then
return -1
endi
sql select leastsquares(c1, 1, 1) from (select * from nest_tb0);
if $rows != 1 then
return -1
endi
if $data00 != @{slop:0.000100, intercept:49.000000}@ then
return -1
endi
sql select irate(c1) from (select * from nest_tb0);
if $data00 != 0.016666667 then
return -1
endi
sql select derivative(c1, 1s, 0) from (select * from nest_tb0);
if $rows != 9999 then
return -1
endi
if $data00 != @20-09-15 00:01:00.000@ then
return -1
endi
if $data01 != 0.016666667 then
return -1
endi
if $data10 != @20-09-15 00:02:00.000@ then
return -1
endi
if $data11 != 0.016666667 then
return -1
endi
sql select diff(c1) from (select * from nest_tb0);
if $rows != 9999 then
return -1
endi
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
if $rows != 7 then
@ -351,4 +424,6 @@ if $data01 != 1 then
return -1
endi
sql_error select last_row(*) from (select * from nest_tb0) having c1 > 0
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -193,3 +193,7 @@ endi
if $data04 != 1 then
return -1
endi
print ===============>safty check TD-4927
sql select first(ts, c1) from sr_stb where ts<1 group by t1;
sql select first(ts, c1) from sr_stb where ts>0 and ts<1;

View File

@ -12,7 +12,7 @@ run general/parser/create_tb.sim
run general/parser/dbtbnameValidate.sim
run general/parser/fill.sim
run general/parser/fill_stb.sim
#run general/parser/fill_us.sim #
run general/parser/fill_us.sim
run general/parser/first_last.sim
run general/parser/import_commit1.sim
run general/parser/import_commit2.sim

View File

@ -352,18 +352,24 @@ if $rows != 0 then
return -1
endi
print ==========================> td-4783
print ==========================> td-4783,td-4792
sql create table where_ts(ts timestamp, f int)
sql insert into where_ts values('2021-06-19 16:22:00', 1);
sql insert into where_ts values('2021-06-19 16:23:00', 2);
sql insert into where_ts values('2021-06-19 16:24:00', 3);
sql insert into where_ts values('2021-06-19 16:25:00', 1);
sql select * from (select * from where_ts) where ts<'2021-06-19 16:25:00' and ts>'2021-06-19 16:22:00'
if $row != 2 then
if $rows != 2 then
return -1
endi
print $data00, $data01
if $data01 != 2 then
return -1
endi
sql insert into where_ts values(now, 5);
sleep 10
sql select * from (select * from where_ts) where ts<now;
if $rows != 5 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -34,6 +34,7 @@ print =============== step1 - login
system_content curl 127.0.0.1:7111/admin/
print 1-> $system_content
if $system_content != @{"status":"error","code":4357,"desc":"no auth info input"}@ then
print actual: $system_content
return -1
endi
@ -149,6 +150,8 @@ endi
system_content curl -H 'Authorization: Taosd /KfeAzX/f9na8qdtNZmtONryp201ma04bEl8LcvLUd7a8qdtNZmtONryp201ma04' -d 'select * from d1.table_admin' 127.0.0.1:7111/admin/all
print curl 127.0.0.1:7111/admin/all -----> $system_content
if $system_content != @{"status":"succ","head":["ts","i"],"data":[["2017-12-25 21:28:41.022",1],["2017-12-25 21:28:42.022",2],["2017-12-25 21:28:43.022",3],["2017-12-25 21:28:44.022",4],["2017-12-25 21:28:45.022",5],["2017-12-25 21:28:46.022",6],["2017-12-25 21:28:47.022",7],["2017-12-25 21:28:48.022",8],["2017-12-25 21:28:49.022",9],["2017-12-25 21:28:50.022",10],["2017-12-25 21:28:51.022",11]],"rows":11}@ then
print actual: $system_content
print expect =======> {"status":"succ","head":["ts","i"],"data":[["2017-12-25 21:28:41.022",1],["2017-12-25 21:28:42.022",2],["2017-12-25 21:28:43.022",3],["2017-12-25 21:28:44.022",4],["2017-12-25 21:28:45.022",5],["2017-12-25 21:28:46.022",6],["2017-12-25 21:28:47.022",7],["2017-12-25 21:28:48.022",8],["2017-12-25 21:28:49.022",9],["2017-12-25 21:28:50.022",10],["2017-12-25 21:28:51.022",11]],"rows":11}
return -1
endi