Merge branch 'fix/TD-20317' of https://github.com/taosdata/TDengine into fix/TD-20317

This commit is contained in:
chenhaoran 2022-11-11 12:52:17 +08:00
commit fb06e73b91
54 changed files with 2994 additions and 2623 deletions

View File

@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64") ADD_DEFINITIONS("-D_TD_ARM_64")
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "loongarch64")
MESSAGE(STATUS "The current platform is loongarch64")
SET(PLATFORM_ARCH_STR "loongarch64")
SET(TD_LOONGARCH_64 TRUE)
ADD_DEFINITIONS("-D_TD_LOONGARCH_")
ADD_DEFINITIONS("-D_TD_LOONGARCH_64")
ENDIF () ENDIF ()
ELSE () ELSE ()
# if generate ARM version: # if generate ARM version:
@ -118,6 +124,12 @@ ELSE ()
ADD_DEFINITIONS("-D_TD_ARM_") ADD_DEFINITIONS("-D_TD_ARM_")
ADD_DEFINITIONS("-D_TD_ARM_64") ADD_DEFINITIONS("-D_TD_ARM_64")
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ELSEIF (${CPUTYPE} MATCHES "loongarch64")
SET(PLATFORM_ARCH_STR "loongarch64")
MESSAGE(STATUS "input cpuType: loongarch64")
SET(TD_LOONGARCH_64 TRUE)
ADD_DEFINITIONS("-D_TD_LOONGARCH_")
ADD_DEFINITIONS("-D_TD_LOONGARCH_64")
ELSEIF (${CPUTYPE} MATCHES "mips64") ELSEIF (${CPUTYPE} MATCHES "mips64")
SET(PLATFORM_ARCH_STR "mips") SET(PLATFORM_ARCH_STR "mips")
MESSAGE(STATUS "input cpuType: mips64") MESSAGE(STATUS "input cpuType: mips64")

View File

@ -14,7 +14,7 @@ Note: ● means officially tested and verified, ○ means unofficially tested an
## List of supported platforms for TDengine clients and connectors ## List of supported platforms for TDengine clients and connectors
TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha hardware platforms and Linux/Win64/Win32/macOS development environments. TDengine's connector can support a wide range of platforms, including X64/X86/ARM64/ARM32/MIPS/Alpha/LoongArch64 hardware platforms and Linux/Win64/Win32/macOS development environments.
The comparison matrix is as follows. The comparison matrix is as follows.

View File

@ -16,7 +16,7 @@ description: "TDengine 服务端、客户端和连接器支持的平台列表"
## TDengine 客户端和连接器支持的平台列表 ## TDengine 客户端和连接器支持的平台列表
目前 TDengine 的连接器可支持的平台广泛目前包括X64/X86/ARM64/ARM32/MIPS/Alpha 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。 目前 TDengine 的连接器可支持的平台广泛目前包括X64/X86/ARM64/ARM32/MIPS/LoongArch64 等硬件平台,以及 Linux/Win64/Win32/macOS 等开发环境。
对照矩阵如下: 对照矩阵如下:

View File

@ -135,6 +135,9 @@ extern int32_t tsTtlPushInterval;
extern int32_t tsGrantHBInterval; extern int32_t tsGrantHBInterval;
extern int32_t tsUptimeInterval; extern int32_t tsUptimeInterval;
extern int32_t tsRpcRetryLimit;
extern int32_t tsRpcRetryInterval;
//#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) //#define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)
int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd, int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDir, const char **envCmd,

View File

@ -174,61 +174,61 @@
#define TK_SUBSCRIPTIONS 156 #define TK_SUBSCRIPTIONS 156
#define TK_VNODES 157 #define TK_VNODES 157
#define TK_LIKE 158 #define TK_LIKE 158
#define TK_INDEX 159 #define TK_TBNAME 159
#define TK_FUNCTION 160 #define TK_QTAGS 160
#define TK_INTERVAL 161 #define TK_AS 161
#define TK_TOPIC 162 #define TK_INDEX 162
#define TK_AS 163 #define TK_FUNCTION 163
#define TK_WITH 164 #define TK_INTERVAL 164
#define TK_META 165 #define TK_TOPIC 165
#define TK_CONSUMER 166 #define TK_WITH 166
#define TK_GROUP 167 #define TK_META 167
#define TK_DESC 168 #define TK_CONSUMER 168
#define TK_DESCRIBE 169 #define TK_GROUP 169
#define TK_RESET 170 #define TK_DESC 170
#define TK_QUERY 171 #define TK_DESCRIBE 171
#define TK_CACHE 172 #define TK_RESET 172
#define TK_EXPLAIN 173 #define TK_QUERY 173
#define TK_ANALYZE 174 #define TK_CACHE 174
#define TK_VERBOSE 175 #define TK_EXPLAIN 175
#define TK_NK_BOOL 176 #define TK_ANALYZE 176
#define TK_RATIO 177 #define TK_VERBOSE 177
#define TK_NK_FLOAT 178 #define TK_NK_BOOL 178
#define TK_OUTPUTTYPE 179 #define TK_RATIO 179
#define TK_AGGREGATE 180 #define TK_NK_FLOAT 180
#define TK_BUFSIZE 181 #define TK_OUTPUTTYPE 181
#define TK_STREAM 182 #define TK_AGGREGATE 182
#define TK_INTO 183 #define TK_BUFSIZE 183
#define TK_TRIGGER 184 #define TK_STREAM 184
#define TK_AT_ONCE 185 #define TK_INTO 185
#define TK_WINDOW_CLOSE 186 #define TK_TRIGGER 186
#define TK_IGNORE 187 #define TK_AT_ONCE 187
#define TK_EXPIRED 188 #define TK_WINDOW_CLOSE 188
#define TK_FILL_HISTORY 189 #define TK_IGNORE 189
#define TK_SUBTABLE 190 #define TK_EXPIRED 190
#define TK_KILL 191 #define TK_FILL_HISTORY 191
#define TK_CONNECTION 192 #define TK_SUBTABLE 192
#define TK_TRANSACTION 193 #define TK_KILL 193
#define TK_BALANCE 194 #define TK_CONNECTION 194
#define TK_VGROUP 195 #define TK_TRANSACTION 195
#define TK_MERGE 196 #define TK_BALANCE 196
#define TK_REDISTRIBUTE 197 #define TK_VGROUP 197
#define TK_SPLIT 198 #define TK_MERGE 198
#define TK_DELETE 199 #define TK_REDISTRIBUTE 199
#define TK_INSERT 200 #define TK_SPLIT 200
#define TK_NULL 201 #define TK_DELETE 201
#define TK_NK_QUESTION 202 #define TK_INSERT 202
#define TK_NK_ARROW 203 #define TK_NULL 203
#define TK_ROWTS 204 #define TK_NK_QUESTION 204
#define TK_TBNAME 205 #define TK_NK_ARROW 205
#define TK_QSTART 206 #define TK_ROWTS 206
#define TK_QEND 207 #define TK_QSTART 207
#define TK_QDURATION 208 #define TK_QEND 208
#define TK_WSTART 209 #define TK_QDURATION 209
#define TK_WEND 210 #define TK_WSTART 210
#define TK_WDURATION 211 #define TK_WEND 211
#define TK_IROWTS 212 #define TK_WDURATION 212
#define TK_QTAGS 213 #define TK_IROWTS 213
#define TK_CAST 214 #define TK_CAST 214
#define TK_NOW 215 #define TK_NOW 215
#define TK_TODAY 216 #define TK_TODAY 216

View File

@ -284,6 +284,13 @@ typedef struct SShowVnodesStmt {
SNode* pDnodeEndpoint; SNode* pDnodeEndpoint;
} SShowVnodesStmt; } SShowVnodesStmt;
typedef struct SShowTableTagsStmt {
ENodeType type;
SNode* pDbName; // SValueNode
SNode* pTbName; // SValueNode
SNodeList* pTags;
} SShowTableTagsStmt;
typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT } EIndexType; typedef enum EIndexType { INDEX_TYPE_SMA = 1, INDEX_TYPE_FULLTEXT } EIndexType;
typedef struct SIndexOptions { typedef struct SIndexOptions {

View File

@ -297,6 +297,7 @@ typedef struct SSelectStmt {
bool hasStateKey; bool hasStateKey;
bool onlyHasKeepOrderFunc; bool onlyHasKeepOrderFunc;
bool groupSort; bool groupSort;
bool tagScan;
} SSelectStmt; } SSelectStmt;
typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType; typedef enum ESetOperatorType { SET_OP_TYPE_UNION_ALL = 1, SET_OP_TYPE_UNION } ESetOperatorType;

View File

@ -76,12 +76,14 @@ typedef void (*RpcDfp)(void *ahandle);
typedef struct SRpcInit { typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port uint16_t localPort; // local port
char *label; // for debug purpose char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
int32_t idleTime; // milliseconds, 0 means idle timer is disabled int32_t idleTime; // milliseconds, 0 means idle timer is disabled
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not

View File

@ -26,7 +26,7 @@ extern "C" {
#define TSKEY int64_t #define TSKEY int64_t
#define TSKEY_MIN INT64_MIN #define TSKEY_MIN INT64_MIN
#define TSKEY_MAX (INT64_MAX - 1) #define TSKEY_MAX INT64_MAX
#define TSKEY_INITIAL_VAL TSKEY_MIN #define TSKEY_INITIAL_VAL TSKEY_MIN
#define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle #define TD_VER_MAX UINT64_MAX // TODO: use the real max version from query handle

View File

@ -5,7 +5,7 @@ set -e
#set -x #set -x
# dockerbuild.sh # dockerbuild.sh
# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...]
# -n [version number] # -n [version number]
# -p [password for docker hub] # -p [password for docker hub]
# -V [stable | beta] # -V [stable | beta]
@ -57,7 +57,7 @@ do
dockerLatest=$(echo $OPTARG) dockerLatest=$(echo $OPTARG)
;; ;;
h) h)
echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] "
echo " -n [version number] " echo " -n [version number] "
echo " -p [password for docker hub] " echo " -p [password for docker hub] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
@ -136,4 +136,4 @@ if [ "$cloudBuild" != "y" ] && [ ${dockerLatest} == 'y' ] ;then
docker push tdengine/tdengine-${dockername}:latest docker push tdengine/tdengine-${dockername}:latest
fi fi
rm -f ${pkgFile} rm -f ${pkgFile}

View File

@ -5,7 +5,7 @@ set -e
#set -x #set -x
# dockerbuild.sh # dockerbuild.sh
# -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...]
# -n [version number] # -n [version number]
# -p [password for docker hub] # -p [password for docker hub]
@ -30,7 +30,7 @@ do
passWord=$(echo $OPTARG) passWord=$(echo $OPTARG)
;; ;;
h) h)
echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 ...] " echo "Usage: `basename $0` -c [aarch32 | aarch64 | amd64 | x86 | mips64 | loongarch64...] "
echo " -n [version number] " echo " -n [version number] "
echo " -p [password for docker hub] " echo " -p [password for docker hub] "
exit 0 exit 0

View File

@ -6,7 +6,7 @@ set -e
#set -x #set -x
# release.sh -v [cluster | edge] # release.sh -v [cluster | edge]
# -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] # -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64...]
# -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] # -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
# -V [stable | beta] # -V [stable | beta]
# -l [full | lite] # -l [full | lite]
@ -19,7 +19,7 @@ set -e
# set parameters by default value # set parameters by default value
verMode=edge # [cluster, edge, cloud] verMode=edge # [cluster, edge, cloud]
verType=stable # [stable, beta] verType=stable # [stable, beta]
cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...] cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 loongarch64...]
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
pagMode=full # [full | lite] pagMode=full # [full | lite]
soMode=dynamic # [static | dynamic] soMode=dynamic # [static | dynamic]
@ -77,7 +77,7 @@ while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do
;; ;;
h) h)
echo "Usage: $(basename $0) -v [cluster | edge] " echo "Usage: $(basename $0) -v [cluster | edge] "
echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 ...] " echo " -c [aarch32 | aarch64 | x64 | x86 | mips64 | loongarch64 ...] "
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] " echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
echo " -V [stable | beta] " echo " -V [stable | beta] "
echo " -l [full | lite] " echo " -l [full | lite] "
@ -216,7 +216,7 @@ else
fi fi
# check support cpu type # check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]]; then if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]] || [[ "$cpuType" == "loongarch64" ]] ; then
if [ "$verMode" == "edge" ]; then if [ "$verMode" == "edge" ]; then
# community-version compile # community-version compile
cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro} cmake ../ -DCPUTYPE=${cpuType} -DWEBSOCKET=true -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}

View File

@ -146,6 +146,8 @@ void *openTransporter(const char *user, const char *auth, int32_t numOfThread) {
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
rpcInit.dfp = destroyAhandle; rpcInit.dfp = destroyAhandle;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
void *pDnodeConn = rpcOpen(&rpcInit); void *pDnodeConn = rpcOpen(&rpcInit);
if (pDnodeConn == NULL) { if (pDnodeConn == NULL) {

View File

@ -1971,6 +1971,8 @@ TSDB_SERVER_STATUS taos_check_server_status(const char* fqdn, int port, char* de
rpcInit.idleTime = tsShellActivityTimer * 1000; rpcInit.idleTime = tsShellActivityTimer * 1000;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
rpcInit.user = "_dnd"; rpcInit.user = "_dnd";
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
clientRpc = rpcOpen(&rpcInit); clientRpc = rpcOpen(&rpcInit);
if (clientRpc == NULL) { if (clientRpc == NULL) {

View File

@ -2082,7 +2082,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
SSmlLineInfo elements = {0}; SSmlLineInfo elements = {0};
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s, hello", info->id, sql); uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql", info->id);
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {

View File

@ -163,10 +163,12 @@ int32_t tsMqRebalanceInterval = 2;
int32_t tsTtlUnit = 86400; int32_t tsTtlUnit = 86400;
int32_t tsTtlPushInterval = 86400; int32_t tsTtlPushInterval = 86400;
int32_t tsGrantHBInterval = 60; int32_t tsGrantHBInterval = 60;
int32_t tsUptimeInterval = 300; // seconds int32_t tsUptimeInterval = 300; // seconds
char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits
char tsUdfdLdLibPath[512] = ""; char tsUdfdLdLibPath[512] = "";
int32_t tsRpcRetryLimit = 100;
int32_t tsRpcRetryInterval = 15;
#ifndef _STORAGE #ifndef _STORAGE
int32_t taosSetTfsCfg(SConfig *pCfg) { int32_t taosSetTfsCfg(SConfig *pCfg) {
SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); SConfigItem *pItem = cfgGetItem(pCfg, "dataDir");
@ -297,6 +299,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
@ -422,6 +426,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1; if (cfgAddBool(pCfg, "udf", tsStartUdfd, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, 0) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
GRANT_CFG_ADD; GRANT_CFG_ADD;
return 0; return 0;
} }
@ -634,6 +642,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32; tsQueryNodeChunkSize = cfgGetItem(pCfg, "queryNodeChunkSize")->i32;
tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval; tsQueryUseNodeAllocator = cfgGetItem(pCfg, "queryUseNodeAllocator")->bval;
tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval; tsKeepColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
return 0; return 0;
} }
@ -708,6 +719,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
if (tsQueryBufferSize >= 0) { if (tsQueryBufferSize >= 0) {
tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL;
} }
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
GRANT_CFG_GET; GRANT_CFG_GET;
return 0; return 0;
} }

View File

@ -258,6 +258,8 @@ int32_t dmInitClient(SDnode *pDnode) {
rpcInit.parent = pDnode; rpcInit.parent = pDnode;
rpcInit.rfp = rpcRfp; rpcInit.rfp = rpcRfp;
rpcInit.compressSize = tsCompressMsgSize; rpcInit.compressSize = tsCompressMsgSize;
rpcInit.retryLimit = tsRpcRetryLimit;
rpcInit.retryInterval = tsRpcRetryInterval;
pTrans->clientRpc = rpcOpen(&rpcInit); pTrans->clientRpc = rpcOpen(&rpcInit);
if (pTrans->clientRpc == NULL) { if (pTrans->clientRpc == NULL) {

View File

@ -70,6 +70,7 @@ int32_t metaCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo); int32_t metaStatsCacheUpsert(SMeta* pMeta, SMetaStbStats* pInfo);
int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid); int32_t metaStatsCacheDrop(SMeta* pMeta, int64_t uid);
int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo); int32_t metaStatsCacheGet(SMeta* pMeta, int64_t uid, SMetaStbStats* pInfo);
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta);
struct SMeta { struct SMeta {
TdThreadRwlock lock; TdThreadRwlock lock;

View File

@ -1445,3 +1445,13 @@ int32_t metaGetStbStats(SMeta *pMeta, int64_t uid, SMetaStbStats *pInfo) {
_exit: _exit:
return code; return code;
} }
void metaUpdateStbStats(SMeta *pMeta, int64_t uid, int64_t delta) {
SMetaStbStats stats = {0};
if (metaStatsCacheGet(pMeta, uid, &stats) == TSDB_CODE_SUCCESS) {
stats.ctbNum += delta;
metaStatsCacheUpsert(pMeta, &stats);
}
}

View File

@ -371,7 +371,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
// update uid index // update uid index
metaUpdateUidIdx(pMeta, &nStbEntry); metaUpdateUidIdx(pMeta, &nStbEntry);
metaStatsCacheDrop(pMeta, nStbEntry.uid); // metaStatsCacheDrop(pMeta, nStbEntry.uid);
metaULock(pMeta); metaULock(pMeta);
@ -450,6 +450,10 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq, STableMe
#endif #endif
++pMeta->pVnode->config.vndStats.numOfCTables; ++pMeta->pVnode->config.vndStats.numOfCTables;
metaWLock(pMeta);
metaUpdateStbStats(pMeta, me.ctbEntry.suid, 1);
metaULock(pMeta);
} else { } else {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ctime = pReq->ctime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
@ -670,6 +674,8 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn); tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), &pMeta->txn);
--pMeta->pVnode->config.vndStats.numOfCTables; --pMeta->pVnode->config.vndStats.numOfCTables;
metaUpdateStbStats(pMeta, e.ctbEntry.suid, -1);
} else if (e.type == TSDB_NORMAL_TABLE) { } else if (e.type == TSDB_NORMAL_TABLE) {
// drop schema.db (todo) // drop schema.db (todo)

View File

@ -244,7 +244,7 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
int32_t rows = pDataBlock->info.rows; int32_t rows = pDataBlock->info.rows;
tqDebug("tq sink, convert block %d, rows: %d", i, rows); tqDebug("tq sink, convert block1 %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
int32_t schemaLen = 0; int32_t schemaLen = 0;
@ -486,7 +486,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
blkHead->uid = 0; blkHead->uid = 0;
blkHead->schemaLen = 0; blkHead->schemaLen = 0;
tqDebug("tq sink, convert block %d, rows: %d", i, rows); tqDebug("tq sink, convert block2 %d, rows: %d", i, rows);
int32_t dataLen = 0; int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk)); void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
@ -514,6 +514,9 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else { } else {
void* colData = colDataGetData(pColData, j); void* colData = colDataGetData(pColData, j);
if (k == 0) {
tqDebug("tq sink, row %d ts %" PRId64, j, *(int64_t*)colData);
}
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k); tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
} }
} }

View File

@ -495,6 +495,10 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision); pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey, tsdbFidKeyRange(pCommitter->commitFid, pCommitter->minutes, pCommitter->precision, &pCommitter->minKey,
&pCommitter->maxKey); &pCommitter->maxKey);
#if 0
ASSERT(pCommitter->minKey <= pCommitter->nextKey && pCommitter->maxKey >= pCommitter->nextKey);
#endif
pCommitter->nextKey = TSKEY_MAX; pCommitter->nextKey = TSKEY_MAX;
// Reader // Reader

View File

@ -568,7 +568,9 @@ static int32_t tsdbInsertTableDataImpl(SMemTable *pMemTable, STbData *pTbData, i
do { do {
key.ts = row.pTSRow->ts; key.ts = row.pTSRow->ts;
nRow++; nRow++;
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS); if (SL_NODE_FORWARD(pos[0], 0) != pTbData->sl.pTail) {
tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_FROM_POS);
}
code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1); code = tbDataDoPut(pMemTable, pTbData, pos, &row, 1);
if (code) { if (code) {
goto _err; goto _err;

View File

@ -505,8 +505,8 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision) {
} }
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) { void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * minutes * tsTickPerMin[precision]; *minKey = tsTickPerMin[precision] * fid * minutes;
*maxKey = *minKey + minutes * tsTickPerMin[precision] - 1; *maxKey = *minKey + tsTickPerMin[precision] * minutes - 1;
} }
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) { int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t now) {

View File

@ -15,6 +15,15 @@
#include "tsdb.h" #include "tsdb.h"
/**
* @brief max key by precision
* approximately calculation:
* ms: 3600*1000*8765*1000 // 1970 + 1000 years
* us: 3600*1000000*8765*1000 // 1970 + 1000 years
* ns: 3600*1000000000*8765*292 // 1970 + 292 years
*/
static int64_t tsMaxKeyByPrecision[] = {31556995200000L, 31556995200000000L, 9214646400000000000L};
// static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); // static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) { int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp) {
@ -97,7 +106,7 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg) {
STsdbKeepCfg *pCfg = &pTsdb->keepCfg; STsdbKeepCfg *pCfg = &pTsdb->keepCfg;
TSKEY now = taosGetTimestamp(pCfg->precision); TSKEY now = taosGetTimestamp(pCfg->precision);
TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2;
TSKEY maxKey = now + tsTickPerMin[pCfg->precision] * pCfg->days; TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision];
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
// pMsg->length = htonl(pMsg->length); // pMsg->length = htonl(pMsg->length);

View File

@ -981,6 +981,7 @@ void setTaskKilled(SExecTaskInfo* pTaskInfo);
void queryCostStatis(SExecTaskInfo* pTaskInfo); void queryCostStatis(SExecTaskInfo* pTaskInfo);
void doDestroyTask(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo);
void destroyOperatorInfo(SOperatorInfo* pOperator);
int32_t getMaximumIdleDurationSec(); int32_t getMaximumIdleDurationSec();
/* /*

View File

@ -91,9 +91,6 @@ static void destroyAggOperatorInfo(void* param);
static void destroyIntervalOperatorInfo(void* param); static void destroyIntervalOperatorInfo(void* param);
static void destroyOperatorInfo(SOperatorInfo* pOperator);
void setOperatorCompleted(SOperatorInfo* pOperator) { void setOperatorCompleted(SOperatorInfo* pOperator) {
pOperator->status = OP_EXEC_DONE; pOperator->status = OP_EXEC_DONE;
ASSERT(pOperator->pTaskInfo != NULL); ASSERT(pOperator->pTaskInfo != NULL);
@ -2172,7 +2169,7 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs) {
} }
} }
static void destroyOperatorInfo(SOperatorInfo* pOperator) { void destroyOperatorInfo(SOperatorInfo* pOperator) {
if (pOperator == NULL) { if (pOperator == NULL) {
return; return;
} }

View File

@ -816,12 +816,12 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
goto _error; goto _error;
} }
setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "PartitionOperator", QUERY_NODE_PHYSICAL_PLAN_PARTITION, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, NULL);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);
return pOperator; return pOperator;
@ -900,7 +900,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
// TODO check tbname validity // TODO check tbname validity
if (pData != (void*)-1) { if (pData != (void*)-1) {
memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pDest->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pDest->info.parTbName, varDataVal(pData), len); memcpy(pDest->info.parTbName, varDataVal(pData), len);
/*pDest->info.parTbName[len + 1] = 0;*/ /*pDest->info.parTbName[len + 1] = 0;*/
} else { } else {
@ -1099,11 +1099,12 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
int32_t numOfCols = 0; int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols); SExprInfo* pExprInfo = createExprInfo(pPartNode->part.pTargets, NULL, &numOfCols);
setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamPartitionOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->exprSupp.pExprInfo = pExprInfo;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, pOperator->fpSet =
destroyStreamPartitionOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, destroyStreamPartitionOperatorInfo, NULL);
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);

View File

@ -946,7 +946,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->currentGroupId = -1; pInfo->currentGroupId = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5); pInfo->metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);
@ -980,7 +981,8 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
pInfo->dataReader = pReadHandle; pInfo->dataReader = pReadHandle;
// pInfo->prevGroupId = -1; // pInfo->prevGroupId = -1;
setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL);
return pOperator; return pOperator;
} }
@ -1136,8 +1138,10 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi
goto _error; goto _error;
} }
setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL); OP_NOT_OPENED, pInfo, pTaskInfo);
pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, NULL);
return pOperator; return pOperator;
_error: _error:
@ -1581,7 +1585,7 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
// TODO check tbname validation // TODO check tbname validation
if (pData != (void*)-1 && pData != NULL) { if (pData != (void*)-1 && pData != NULL) {
memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN); memset(pBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN); int32_t len = TMIN(varDataLen(pData), TSDB_TABLE_NAME_LEN - 1);
memcpy(pBlock->info.parTbName, varDataVal(pData), len); memcpy(pBlock->info.parTbName, varDataVal(pData), len);
/*pBlock->info.parTbName[len + 1] = 0;*/ /*pBlock->info.parTbName[len + 1] = 0;*/
} else { } else {
@ -2351,7 +2355,8 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
pInfo->vnode = pHandle->vnode; pInfo->vnode = pHandle->vnode;
pInfo->sContext = pHandle->sContext; pInfo->sContext = pHandle->sContext;
setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, NULL);
return pOperator; return pOperator;
@ -2366,9 +2371,7 @@ _end:
static void destroyStreamScanOperatorInfo(void* param) { static void destroyStreamScanOperatorInfo(void* param) {
SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param; SStreamScanInfo* pStreamScan = (SStreamScanInfo*)param;
if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) { if (pStreamScan->pTableScanOp && pStreamScan->pTableScanOp->info) {
STableScanInfo* pTableScanInfo = pStreamScan->pTableScanOp->info; destroyOperatorInfo(pStreamScan->pTableScanOp);
destroyTableScanOperatorInfo(pTableScanInfo);
taosMemoryFreeClear(pStreamScan->pTableScanOp);
} }
if (pStreamScan->tqReader) { if (pStreamScan->tqReader) {
tqCloseReader(pStreamScan->tqReader); tqCloseReader(pStreamScan->tqReader);
@ -2537,7 +2540,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->partitionSup.needCalc = false; pInfo->partitionSup.needCalc = false;
setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamScanOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan; __optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
@ -4175,7 +4179,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
pInfo->readHandle = *(SReadHandle*)readHandle; pInfo->readHandle = *(SReadHandle*)readHandle;
} }
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, NULL);
return pOperator; return pOperator;
@ -4305,7 +4310,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
pInfo->readHandle = *pReadHandle; pInfo->readHandle = *pReadHandle;
pInfo->curPos = 0; pInfo->curPos = 0;
setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TagScanOperator", QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN, false, OP_NOT_OPENED, pInfo,
pTaskInfo);
initResultSizeInfo(&pOperator->resultInfo, 4096); initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
@ -4815,11 +4821,12 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t rowSize = pInfo->pResBlock->info.rowSize; int32_t rowSize = pInfo->pResBlock->info.rowSize;
pInfo->bufPageSize = getProperSortPageSize(rowSize); pInfo->bufPageSize = getProperSortPageSize(rowSize);
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo,
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo); getTableMergeScanExplainExecInfo);
pOperator->cost.openCost = 0; pOperator->cost.openCost = 0;
return pOperator; return pOperator;

View File

@ -680,9 +680,9 @@ SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) { void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
for (int32_t i = start; i < end; i++) { for (int32_t i = start; i < end; i++) {
destroyExprInfo(pFillCol[i].pExpr, 1); destroyExprInfo(pFillCol[i].pExpr, 1);
taosMemoryFreeClear(pFillCol[i].pExpr);
taosVariantDestroy(&pFillCol[i].fillVal); taosVariantDestroy(&pFillCol[i].fillVal);
} }
taosMemoryFreeClear(pFillCol[start].pExpr);
taosMemoryFree(pFillCol); taosMemoryFree(pFillCol);
return NULL; return NULL;
} }

View File

@ -1776,7 +1776,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
} }
initResultRowInfo(&pInfo->binfo.resultRowInfo); initResultRowInfo(&pInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TimeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL); createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, NULL);
@ -2551,9 +2552,9 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pScanInfo->cond.twindows = pInfo->win; pScanInfo->cond.twindows = pInfo->win;
pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL; pScanInfo->cond.type = TIMEWINDOW_RANGE_EXTERNAL;
setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
pOperator->fpSet = pTaskInfo);
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
@ -2622,7 +2623,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
pInfo->tsSlotId = tsSlotId; pInfo->tsSlotId = tsSlotId;
setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo,
pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL); createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, NULL);
@ -2694,7 +2696,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
goto _error; goto _error;
} }
setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, NULL);
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
@ -3822,7 +3825,7 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) { void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlock* pBlock, void** Ite) {
blockDataCleanup(pBlock); blockDataCleanup(pBlock);
int32_t size = tSimpleHashGetSize(pStDeleted); int32_t size = tSimpleHashGetSize(pStDeleted);
if (size == 0) { if (size == 0) {
@ -3848,6 +3851,26 @@ void doBuildDeleteDataBlock(SSHashObj* pStDeleted, SSDataBlock* pBlock, void** I
colDataAppendNULL(pCalStCol, pBlock->info.rows); colDataAppendNULL(pCalStCol, pBlock->info.rows);
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
colDataAppendNULL(pCalEdCol, pBlock->info.rows); colDataAppendNULL(pCalEdCol, pBlock->info.rows);
SHashObj* pGroupIdTbNameMap = NULL;
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
SStreamSessionAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
SStreamStateAggOperatorInfo* pInfo = pOp->info;
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
}
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
if (tbname == NULL) {
colDataAppendNULL(pTableCol, pBlock->info.rows);
} else {
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
}
pBlock->info.rows += 1; pBlock->info.rows += 1;
} }
if ((*Ite) == NULL) { if ((*Ite) == NULL) {
@ -3994,7 +4017,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
} else if (pOperator->status == OP_RES_TO_RETURN) { } else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4099,7 +4122,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
#endif #endif
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session"); printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4223,7 +4246,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete"); printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4303,7 +4326,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
return pBInfo->pRes; return pBInfo->pRes;
} }
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "semi session delete"); printDataBlock(pInfo->pDelRes, "semi session delete");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4327,7 +4350,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION); pInfo->isFinal = (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION);
char* name = (pInfo->isFinal)? "StreamSessionFinalAggOperator":"StreamSessionSemiAggOperator"; char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator";
if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR);
@ -4336,7 +4359,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
destroyStreamSessionAggOperatorInfo, NULL); destroyStreamSessionAggOperatorInfo, NULL);
} }
setOperatorInfo(pOperator, name, pPhyNode->type , false, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
@ -4555,7 +4578,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
SOptrBasicInfo* pBInfo = &pInfo->binfo; SOptrBasicInfo* pBInfo = &pInfo->binfo;
int64_t maxTs = INT64_MIN; int64_t maxTs = INT64_MIN;
if (pOperator->status == OP_RES_TO_RETURN) { if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete"); printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4622,7 +4645,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated); initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator); doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) { if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete"); printDataBlock(pInfo->pDelRes, "single state delete");
return pInfo->pDelRes; return pInfo->pDelRes;
@ -4698,7 +4721,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, NULL);
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType, initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
@ -4973,7 +4997,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
initResultRowInfo(&iaInfo->binfo.resultRowInfo); initResultRowInfo(&iaInfo->binfo.resultRowInfo);
blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(iaInfo->binfo.pRes, pOperator->resultInfo.capacity);
setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL, false, OP_NOT_OPENED, miaInfo, pTaskInfo); setOperatorInfo(pOperator, "TimeMergeAlignedIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL,
false, OP_NOT_OPENED, miaInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, NULL);
@ -5279,7 +5304,8 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
} }
initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo); initResultRowInfo(&pIntervalInfo->binfo.resultRowInfo);
setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false,
OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo);
pOperator->fpSet = pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL); createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, NULL);
@ -5509,9 +5535,10 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap =
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, pInfo, pTaskInfo);
destroyStreamFinalIntervalOperatorInfo, NULL); pOperator->fpSet =
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
code = appendDownstream(pOperator, &downstream, 1); code = appendDownstream(pOperator, &downstream, 1);

View File

@ -424,8 +424,9 @@ SNode* nodesMakeNode(ENodeType type) {
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT: case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
case QUERY_NODE_SHOW_TAGS_STMT: case QUERY_NODE_SHOW_TAGS_STMT:
case QUERY_NODE_SHOW_TABLE_TAGS_STMT:
return makeNode(type, sizeof(SShowStmt)); return makeNode(type, sizeof(SShowStmt));
case QUERY_NODE_SHOW_TABLE_TAGS_STMT:
return makeNode(type, sizeof(SShowTableTagsStmt));
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT: case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
return makeNode(type, sizeof(SShowDnodeVariablesStmt)); return makeNode(type, sizeof(SShowDnodeVariablesStmt));
case QUERY_NODE_SHOW_CREATE_DATABASE_STMT: case QUERY_NODE_SHOW_CREATE_DATABASE_STMT:
@ -942,13 +943,19 @@ void nodesDestroyNode(SNode* pNode) {
case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT: case QUERY_NODE_SHOW_LOCAL_VARIABLES_STMT:
case QUERY_NODE_SHOW_TRANSACTIONS_STMT: case QUERY_NODE_SHOW_TRANSACTIONS_STMT:
case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT: case QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT:
case QUERY_NODE_SHOW_TAGS_STMT: case QUERY_NODE_SHOW_TAGS_STMT: {
case QUERY_NODE_SHOW_TABLE_TAGS_STMT: {
SShowStmt* pStmt = (SShowStmt*)pNode; SShowStmt* pStmt = (SShowStmt*)pNode;
nodesDestroyNode(pStmt->pDbName); nodesDestroyNode(pStmt->pDbName);
nodesDestroyNode(pStmt->pTbName); nodesDestroyNode(pStmt->pTbName);
break; break;
} }
case QUERY_NODE_SHOW_TABLE_TAGS_STMT: {
SShowTableTagsStmt* pStmt = (SShowTableTagsStmt*)pNode;
nodesDestroyNode(pStmt->pDbName);
nodesDestroyNode(pStmt->pTbName);
nodesDestroyList(pStmt->pTags);
break;
}
case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT: case QUERY_NODE_SHOW_DNODE_VARIABLES_STMT:
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pDnodeId); nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pDnodeId);
nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pLikePattern); nodesDestroyNode(((SShowDnodeVariablesStmt*)pNode)->pLikePattern);

View File

@ -178,6 +178,7 @@ SNode* createShowCreateTableStmt(SAstCreateContext* pCxt, ENodeType type, SNode*
SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable); SNode* createShowTableDistributedStmt(SAstCreateContext* pCxt, SNode* pRealTable);
SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pLikePattern); SNode* createShowDnodeVariablesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pLikePattern);
SNode* createShowVnodesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pDnodeEndpoint); SNode* createShowVnodesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pDnodeEndpoint);
SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* pDbName, SNodeList* pTags);
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo); SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo);
SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal); SNode* createAlterUserStmt(SAstCreateContext* pCxt, SToken* pUserName, int8_t alterType, const SToken* pVal);
SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName); SNode* createDropUserStmt(SAstCreateContext* pCxt, SToken* pUserName);

View File

@ -426,7 +426,7 @@ cmd ::= SHOW TABLE DISTRIBUTED full_table_name(A).
cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); } cmd ::= SHOW CONSUMERS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_CONSUMERS_STMT); }
cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); } cmd ::= SHOW SUBSCRIPTIONS. { pCxt->pRootNode = createShowStmt(pCxt, QUERY_NODE_SHOW_SUBSCRIPTIONS_STMT); }
cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, B, A, OP_TYPE_EQUAL); } cmd ::= SHOW TAGS FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TAGS_STMT, B, A, OP_TYPE_EQUAL); }
cmd ::= SHOW TABLE TAGS FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowStmtWithCond(pCxt, QUERY_NODE_SHOW_TABLE_TAGS_STMT, B, A, OP_TYPE_EQUAL); } cmd ::= SHOW TABLE TAGS tag_list_opt(C) FROM table_name_cond(A) from_db_opt(B). { pCxt->pRootNode = createShowTableTagsStmt(pCxt, A, B, C); }
cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); } cmd ::= SHOW VNODES NK_INTEGER(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BIGINT, &A), NULL); }
cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); } cmd ::= SHOW VNODES NK_STRING(A). { pCxt->pRootNode = createShowVnodesStmt(pCxt, NULL, createValueNode(pCxt, TSDB_DATA_TYPE_VARCHAR, &A)); }
@ -441,6 +441,18 @@ table_name_cond(A) ::= table_name(B).
from_db_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); } from_db_opt(A) ::= . { A = createDefaultDatabaseCondValue(pCxt); }
from_db_opt(A) ::= FROM db_name(B). { A = createIdentifierValueNode(pCxt, &B); } from_db_opt(A) ::= FROM db_name(B). { A = createIdentifierValueNode(pCxt, &B); }
%type tag_list_opt { SNodeList* }
%destructor tag_list_opt { nodesDestroyList($$); }
tag_list_opt(A) ::= . { A = NULL; }
tag_list_opt(A) ::= tag_item(B). { A = createNodeList(pCxt, B); }
tag_list_opt(A) ::= tag_list_opt(B) NK_COMMA tag_item(C). { A = addNodeToList(pCxt, B, C); }
tag_item(A) ::= TBNAME(B). { A = setProjectionAlias(pCxt, createFunctionNode(pCxt, &B, NULL), &B); }
tag_item(A) ::= QTAGS(B). { A = createFunctionNode(pCxt, &B, NULL); }
tag_item(A) ::= column_name(B). { A = createColumnNode(pCxt, NULL, &B); }
tag_item(A) ::= column_name(B) column_alias(C). { A = setProjectionAlias(pCxt, createColumnNode(pCxt, NULL, &B), &C); }
tag_item(A) ::= column_name(B) AS column_alias(C). { A = setProjectionAlias(pCxt, createColumnNode(pCxt, NULL, &B), &C); }
/************************************************ create index ********************************************************/ /************************************************ create index ********************************************************/
cmd ::= CREATE SMA INDEX not_exists_opt(D) cmd ::= CREATE SMA INDEX not_exists_opt(D)
full_table_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, A, B, NULL, C); } full_table_name(A) ON full_table_name(B) index_options(C). { pCxt->pRootNode = createCreateIndexStmt(pCxt, INDEX_TYPE_SMA, D, A, B, NULL, C); }

View File

@ -1390,6 +1390,21 @@ SNode* createShowVnodesStmt(SAstCreateContext* pCxt, SNode* pDnodeId, SNode* pDn
return (SNode*)pStmt; return (SNode*)pStmt;
} }
SNode* createShowTableTagsStmt(SAstCreateContext* pCxt, SNode* pTbName, SNode* pDbName, SNodeList* pTags) {
CHECK_PARSER_STATUS(pCxt);
if (NULL == pDbName) {
snprintf(pCxt->pQueryCxt->pMsg, pCxt->pQueryCxt->msgLen, "database not specified");
pCxt->errCode = TSDB_CODE_PAR_SYNTAX_ERROR;
return NULL;
}
SShowTableTagsStmt* pStmt = (SShowTableTagsStmt*)nodesMakeNode(QUERY_NODE_SHOW_TABLE_TAGS_STMT);
CHECK_OUT_OF_MEM(pStmt);
pStmt->pDbName = pDbName;
pStmt->pTbName = pTbName;
pStmt->pTags = pTags;
return (SNode*)pStmt;
}
SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo) { SNode* createCreateUserStmt(SAstCreateContext* pCxt, SToken* pUserName, const SToken* pPassword, int8_t sysinfo) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);
char password[TSDB_USET_PASSWORD_LEN] = {0}; char password[TSDB_USET_PASSWORD_LEN] = {0};

View File

@ -440,7 +440,7 @@ static int32_t collectMetaKeyFromShowTags(SCollectMetaKeyCxt* pCxt, SShowStmt* p
return code; return code;
} }
static int32_t collectMetaKeyFromShowStableTags(SCollectMetaKeyCxt* pCxt, SShowStmt* pStmt) { static int32_t collectMetaKeyFromShowStableTags(SCollectMetaKeyCxt* pCxt, SShowTableTagsStmt* pStmt) {
return collectMetaKeyFromRealTableImpl(pCxt, ((SValueNode*)pStmt->pDbName)->literal, return collectMetaKeyFromRealTableImpl(pCxt, ((SValueNode*)pStmt->pDbName)->literal,
((SValueNode*)pStmt->pTbName)->literal, AUTH_TYPE_READ); ((SValueNode*)pStmt->pTbName)->literal, AUTH_TYPE_READ);
} }
@ -627,7 +627,7 @@ static int32_t collectMetaKeyFromQuery(SCollectMetaKeyCxt* pCxt, SNode* pStmt) {
case QUERY_NODE_SHOW_TAGS_STMT: case QUERY_NODE_SHOW_TAGS_STMT:
return collectMetaKeyFromShowTags(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowTags(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_TABLE_TAGS_STMT: case QUERY_NODE_SHOW_TABLE_TAGS_STMT:
return collectMetaKeyFromShowStableTags(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowStableTags(pCxt, (SShowTableTagsStmt*)pStmt);
case QUERY_NODE_SHOW_USERS_STMT: case QUERY_NODE_SHOW_USERS_STMT:
return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt); return collectMetaKeyFromShowUsers(pCxt, (SShowStmt*)pStmt);
case QUERY_NODE_SHOW_LICENCES_STMT: case QUERY_NODE_SHOW_LICENCES_STMT:

View File

@ -6288,21 +6288,29 @@ static SNode* createTagsFunction() {
return (SNode*)pFunc; return (SNode*)pFunc;
} }
static int32_t createShowTableTagsProjections(SNodeList** pProjections, SNodeList** pTags) {
if (NULL != *pTags) {
TSWAP(*pProjections, *pTags);
return TSDB_CODE_SUCCESS;
}
int32_t code = nodesListMakeStrictAppend(pProjections, createTbnameFunction());
if (TSDB_CODE_SUCCESS == code) {
code = nodesListStrictAppend(*pProjections, createTagsFunction());
}
return code;
}
static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) { static int32_t rewriteShowStableTags(STranslateContext* pCxt, SQuery* pQuery) {
const char* cols[] = {"tbname", "_tags"}; SShowTableTagsStmt* pShow = (SShowTableTagsStmt*)pQuery->pRoot;
SShowStmt* pShow = (SShowStmt*)pQuery->pRoot; SSelectStmt* pSelect = NULL;
SSelectStmt* pSelect = NULL;
int32_t code = createSimpleSelectStmt(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal, int32_t code = createSimpleSelectStmt(((SValueNode*)pShow->pDbName)->literal, ((SValueNode*)pShow->pTbName)->literal,
-1, NULL, &pSelect); -1, NULL, &pSelect);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSelect->pProjectionList, createTbnameFunction()); code = createShowTableTagsProjections(&pSelect->pProjectionList, &pShow->pTags);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSelect->pProjectionList, createTagsFunction());
}
if (TSDB_CODE_SUCCESS == code) {
pSelect->isDistinct = true;
pQuery->showRewrite = true; pQuery->showRewrite = true;
pSelect->tagScan = true;
nodesDestroyNode(pQuery->pRoot); nodesDestroyNode(pQuery->pRoot);
pQuery->pRoot = (SNode*)pSelect; pQuery->pRoot = (SNode*)pSelect;
} else { } else {

File diff suppressed because it is too large Load Diff

View File

@ -192,6 +192,16 @@ TEST_F(ParserShowToUseTest, showTableDistributed) {
run("SHOW TABLE DISTRIBUTED st1"); run("SHOW TABLE DISTRIBUTED st1");
} }
TEST_F(ParserShowToUseTest, showTableTags) {
useDb("root", "test");
run("SHOW TABLE TAGS FROM st1");
run("SHOW TABLE TAGS tag1, tag2 FROM st1");
run("SHOW TABLE TAGS TBNAME, _TAGS, tag3 FROM st1");
}
TEST_F(ParserShowToUseTest, showTags) { TEST_F(ParserShowToUseTest, showTags) {
useDb("root", "test"); useDb("root", "test");

View File

@ -189,7 +189,7 @@ static int32_t createSelectRootLogicNode(SLogicPlanContext* pCxt, SSelectStmt* p
} }
static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols, static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols, SNodeList* pScanCols,
int8_t tableType) { int8_t tableType, bool tagScan) {
if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) { if (pCxt->pPlanCxt->topicQuery || pCxt->pPlanCxt->streamQuery) {
return SCAN_TYPE_STREAM; return SCAN_TYPE_STREAM;
} }
@ -198,6 +198,10 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_SYSTEM_TABLE; return SCAN_TYPE_SYSTEM_TABLE;
} }
if (tagScan) {
return SCAN_TYPE_TAG;
}
if (NULL == pScanCols) { if (NULL == pScanCols) {
return NULL == pScanPseudoCols return NULL == pScanPseudoCols
? SCAN_TYPE_TABLE ? SCAN_TYPE_TABLE
@ -310,7 +314,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM); code = rewriteExprsForSelect(pScan->pScanPseudoCols, pSelect, SQL_CLAUSE_FROM);
} }
pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType); pScan->scanType = getScanType(pCxt, pScan->pScanPseudoCols, pScan->pScanCols, pScan->tableType, pSelect->tagScan);
if (NULL != pScan->pScanCols) { if (NULL != pScan->pScanCols) {
pScan->hasNormalCols = true; pScan->hasNormalCols = true;

View File

@ -187,11 +187,9 @@ int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
while (1) { while (1) {
int8_t schedStatus = int8_t schedStatus =
atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING); atomic_val_compare_exchange_8(&pTask->schedStatus, TASK_SCHED_STATUS__INACTIVE, TASK_SCHED_STATUS__DROPPING);
if (schedStatus == TASK_SCHED_STATUS__INACTIVE) { if (schedStatus != TASK_SCHED_STATUS__ACTIVE) {
tFreeSStreamTask(pTask); tFreeSStreamTask(pTask);
break; break;
} else if (schedStatus == TASK_SCHED_STATUS__DROPPING) {
break;
} }
taosMsleep(10); taosMsleep(10);
} }

View File

@ -94,11 +94,11 @@ typedef void* queue[2];
/* Return the structure holding the given element. */ /* Return the structure holding the given element. */
#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) #define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit //#define TRANS_RETRY_COUNT_LIMIT 100 // retry count limit
#define TRANS_RETRY_INTERVAL 15 // retry interval (ms) //#define TRANS_RETRY_INTERVAL 15 // retry interval (ms)
#define TRANS_CONN_TIMEOUT 3 // connect timeout (s) #define TRANS_CONN_TIMEOUT 3 // connect timeout (s)
#define TRANS_READ_TIMEOUT 3000 // read timeout (ms) #define TRANS_READ_TIMEOUT 3000 // read timeout (ms)
#define TRANS_PACKET_LIMIT 1024 * 1024 * 512 #define TRANS_PACKET_LIMIT 1024 * 1024 * 512
#define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_MAGIC_NUM 0x5f375a86

View File

@ -47,8 +47,10 @@ typedef struct {
char label[TSDB_LABEL_LEN]; char label[TSDB_LABEL_LEN];
char user[TSDB_UNI_LEN]; // meter ID char user[TSDB_UNI_LEN]; // meter ID
int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size int32_t compressSize; // -1: no compress, 0 : all data compressed, size: compress data if larger than size
int8_t encryption; // encrypt or not int8_t encryption; // encrypt or not
int32_t retryLimit; // retry limit
int32_t retryInterval; // retry interval ms
void (*cfp)(void* parent, SRpcMsg*, SEpSet*); void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
bool (*retry)(int32_t code, tmsg_t msgType); bool (*retry)(int32_t code, tmsg_t msgType);

View File

@ -48,6 +48,8 @@ void* rpcOpen(const SRpcInit* pInit) {
pRpc->compressSize = pInit->compressSize; pRpc->compressSize = pInit->compressSize;
pRpc->encryption = pInit->encryption; pRpc->encryption = pInit->encryption;
pRpc->retryLimit = pInit->retryLimit;
pRpc->retryInterval = pInit->retryInterval;
// register callback handle // register callback handle
pRpc->cfp = pInit->cfp; pRpc->cfp = pInit->cfp;

View File

@ -1288,6 +1288,7 @@ static void doCloseIdleConn(void* param) {
} }
static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) { static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STrans* pTransInst = pThrd->pTransInst;
STransConnCtx* pCtx = pMsg->ctx; STransConnCtx* pCtx = pMsg->ctx;
STraceId* trace = &pMsg->msg.info.traceId; STraceId* trace = &pMsg->msg.info.traceId;
@ -1299,7 +1300,7 @@ static void cliSchedMsgToNextNode(SCliMsg* pMsg, SCliThrd* pThrd) {
STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg)); STaskArg* arg = taosMemoryMalloc(sizeof(STaskArg));
arg->param1 = pMsg; arg->param1 = pMsg;
arg->param2 = pThrd; arg->param2 = pThrd;
transDQSched(pThrd->delayQueue, doDelayTask, arg, TRANS_RETRY_INTERVAL); transDQSched(pThrd->delayQueue, doDelayTask, arg, pTransInst->retryInterval);
} }
FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) { FORCE_INLINE void cliCompareAndSwap(int8_t* val, int8_t exp, int8_t newVal) {
@ -1351,7 +1352,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
pMsg->sent = 0; pMsg->sent = 0;
pCtx->retryCnt += 1; pCtx->retryCnt += 1;
if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) { if (code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_RPC_BROKEN_LINK) {
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, EPSET_GET_SIZE(&pCtx->epSet) * 3); cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, EPSET_GET_SIZE(&pCtx->epSet) * 3);
if (pCtx->retryCnt < pCtx->retryLimit) { if (pCtx->retryCnt < pCtx->retryLimit) {
transUnrefCliHandle(pConn); transUnrefCliHandle(pConn);
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);
@ -1360,7 +1361,7 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
return -1; return -1;
} }
} else { } else {
cliCompareAndSwap(&pCtx->retryLimit, TRANS_RETRY_COUNT_LIMIT, TRANS_RETRY_COUNT_LIMIT); cliCompareAndSwap(&pCtx->retryLimit, pTransInst->retryLimit, pTransInst->retryLimit);
if (pCtx->retryCnt < pCtx->retryLimit) { if (pCtx->retryCnt < pCtx->retryLimit) {
if (pResp->contLen == 0) { if (pResp->contLen == 0) {
EPSET_FORWARD_INUSE(&pCtx->epSet); EPSET_FORWARD_INUSE(&pCtx->epSet);

View File

@ -902,9 +902,11 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid); old_len = sizeof(old_usespid);
#ifndef __loongarch64
if (syscall(SYS__sysctl, &args) == -1) { if (syscall(SYS__sysctl, &args) == -1) {
// printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno)); // printf("_sysctl(kern_core_uses_pid) set fail: %s", strerror(errno));
} }
#endif
// printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); // printf("The old core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
@ -918,9 +920,11 @@ void taosSetCoreDump(bool enable) {
old_len = sizeof(old_usespid); old_len = sizeof(old_usespid);
#ifndef __loongarch64
if (syscall(SYS__sysctl, &args) == -1) { if (syscall(SYS__sysctl, &args) == -1) {
// printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno)); // printf("_sysctl(kern_core_uses_pid) get fail: %s", strerror(errno));
} }
#endif
// printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid); // printf("The new core_uses_pid[%" PRIu64 "]: %d", old_len, old_usespid);
#endif #endif
@ -989,4 +993,4 @@ bool taosCheckCurrentInDll() {
#else #else
return false; return false;
#endif #endif
} }

View File

@ -19,7 +19,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
#include <nmmintrin.h> #include <nmmintrin.h>
#endif #endif
@ -512,7 +512,7 @@ static uint32_t table[16][256] = {
0x9c221d09, 0x6e2e10f7, 0x7dd67004, 0x8fda7dfa} 0x9c221d09, 0x6e2e10f7, 0x7dd67004, 0x8fda7dfa}
}; };
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
static uint32_t long_shifts[4][256] = { static uint32_t long_shifts[4][256] = {
{0x00000000, 0xe040e0ac, 0xc56db7a9, 0x252d5705, 0x8f3719a3, 0x6f77f90f, 0x4a5aae0a, 0xaa1a4ea6, 0x1b8245b7, {0x00000000, 0xe040e0ac, 0xc56db7a9, 0x252d5705, 0x8f3719a3, 0x6f77f90f, 0x4a5aae0a, 0xaa1a4ea6, 0x1b8245b7,
0xfbc2a51b, 0xdeeff21e, 0x3eaf12b2, 0x94b55c14, 0x74f5bcb8, 0x51d8ebbd, 0xb1980b11, 0x37048b6e, 0xd7446bc2, 0xfbc2a51b, 0xdeeff21e, 0x3eaf12b2, 0x94b55c14, 0x74f5bcb8, 0x51d8ebbd, 0xb1980b11, 0x37048b6e, 0xd7446bc2,
@ -846,7 +846,7 @@ uint32_t crc32c_sf(uint32_t crci, crc_stream input, size_t length) {
} }
return (uint32_t)crc ^ 0xffffffff; return (uint32_t)crc ^ 0xffffffff;
} }
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
/* Apply the zeros operator table to crc. */ /* Apply the zeros operator table to crc. */
static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) { static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) {
return shift_table[0][crc & 0xff] ^ shift_table[1][(crc >> 8) & 0xff] ^ shift_table[2][(crc >> 16) & 0xff] ^ return shift_table[0][crc & 0xff] ^ shift_table[1][(crc >> 8) & 0xff] ^ shift_table[2][(crc >> 16) & 0xff] ^
@ -857,7 +857,7 @@ static uint32_t shift_crc(uint32_t shift_table[][256], uint32_t crc) {
version. Otherwise, use the software version. */ version. Otherwise, use the software version. */
uint32_t (*crc32c)(uint32_t crci, crc_stream bytes, size_t len) = crc32c_sf; uint32_t (*crc32c)(uint32_t crci, crc_stream bytes, size_t len) = crc32c_sf;
#if !defined(_TD_ARM_) && !defined(_TD_MIPS_) #if !defined(_TD_ARM_) && !defined(_TD_MIPS_) && !defined(_TD_LOONGARCH_)
/* Compute CRC-32C using the Intel hardware instruction. */ /* Compute CRC-32C using the Intel hardware instruction. */
uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) { uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) {
crc_stream next = buf; crc_stream next = buf;
@ -1012,7 +1012,7 @@ uint32_t crc32c_hw(uint32_t crc, crc_stream buf, size_t len) {
#endif // #ifndef _TD_ARM_ #endif // #ifndef _TD_ARM_
void taosResolveCRC() { void taosResolveCRC() {
#if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS #if defined _TD_ARM_ || defined _TD_MIPS_ || defined WINDOWS || defined _TD_LOONGARCH_
crc32c = crc32c_sf; crc32c = crc32c_sf;
#else #else
int32_t sse42; int32_t sse42;

2
tests/parallel_test/container_build.sh Executable file → Normal file
View File

@ -54,7 +54,7 @@ fi
date date
docker run \ docker run \
-v $REP_MOUNT_PARAM \ -v $REP_MOUNT_PARAM \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true;make -j $THREAD_COUNT" --rm --ulimit core=-1 taos_test:v1.0 sh -c "cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true;make -j $THREAD_COUNT || exit 1"
if [[ -d ${WORKDIR}/debugNoSan ]] ;then if [[ -d ${WORKDIR}/debugNoSan ]] ;then
echo "delete ${WORKDIR}/debugNoSan" echo "delete ${WORKDIR}/debugNoSan"

View File

@ -89,8 +89,8 @@ if $data01 != 40 then
endi endi
print ======== step4 import new data print ======== step4 import new data
sql_error import into tb values (now + 30d , 30 ) #sql_error import into tb values (now + 30d , 30 )
sql_error import into tb values (now + 31d , 31 ) #sql_error import into tb values (now + 31d , 31 )
sql select * from tb order by ts desc sql select * from tb order by ts desc
print ===> rows $rows print ===> rows $rows

View File

@ -216,6 +216,60 @@ sql insert into scalar_tb values (1656668180503+1s, -50, 50.1, "beiJing", "TDeng
print ========== step6 repeat print ========== step6 repeat
sql drop database test; sql drop database test;
print ========== interval\session\state window
sql CREATE DATABASE test1 BUFFER 96 CACHESIZE 1 CACHEMODEL 'none' COMP 2 DURATION 14400m WAL_FSYNC_PERIOD 3000 MAXROWS 4096 MINROWS 100 KEEP 5256000m,5256000m,5256000m PAGES 256 PAGESIZE 4 PRECISION 'ms' REPLICA 1 STRICT 'off' WAL_LEVEL 1 VGROUPS 2 SINGLE_STABLE 0;
sql use test1;
sql CREATE STABLE st (time TIMESTAMP, ca DOUBLE, cb DOUBLE, cc int) TAGS (ta VARCHAR(10) );
print ========== create table before stream
sql CREATE TABLE t1 using st TAGS ('aaa');
sql CREATE TABLE t2 using st TAGS ('bbb');
sql CREATE TABLE t3 using st TAGS ('ccc');
sql CREATE TABLE t4 using st TAGS ('ddd');
print ========== stable
sql create stream streamd1 into streamt1 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null);
sql create stream streamd2 into streamt2 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear);
sql create stream streamd3 into streamt3 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m);
sql create stream streamd4 into streamt4 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m);
sql_error create stream streamd5 into streamt5 as select ca, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc);
sql_error create stream streamd6 into streamt6 as select ta, _wstart,_wend, count(*) as total from st where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc);
print ========== table
sql create stream streamd7 into streamt7 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(null);
sql create stream streamd8 into streamt8 as select ca, _wstart,_wend, count(*), max(ca), max(cb) from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca interval(60m) fill(linear);
sql create stream streamd9 into streamt9 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca session(time, 60m);
sql create stream streamd10 into streamt10 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta session(time, 60m);
sql create stream streamd11 into streamt11 as select ca, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ca state_window(cc);
sql create stream streamd12 into streamt12 as select ta, _wstart,_wend, count(*) as total from t1 where time > "2022-01-01 00:00:00" and time < "2032-01-01 00:00:00" partition by ta state_window(cc);
print ========== create table after stream
sql CREATE TABLE t5 using st TAGS ('eee');
sql CREATE TABLE t6 using st TAGS ('fff');
sql CREATE TABLE t7 using st TAGS ('ggg');
sql CREATE TABLE t8 using st TAGS ('fff');
sleep 1000
print ========== drop stream
sql drop stream if exists streamd1;
sql drop stream if exists streamd2;
sql drop stream if exists streamd3;
sql drop stream if exists streamd4;
#sql drop stream if exists streamd5;
#sql drop stream if exists streamd6;
sql drop stream if exists streamd7;
sql drop stream if exists streamd8;
sql drop stream if exists streamd9;
sql drop stream if exists streamd10;
sql drop stream if exists streamd11;
sql drop stream if exists streamd12;
print ========== step7
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT system sh/exec.sh -n dnode3 -s stop -x SIGINT

View File

@ -9,7 +9,7 @@ $tbPrefix = lm_da_tb
$db = $dbPrefix . $i $db = $dbPrefix . $i
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
print =============== step1 print =============== step1 ms db
sql create database $db sql create database $db
sql use $db sql use $db
@ -23,7 +23,7 @@ if $data00 != @17-01-01 08:00:00.001@ then
return -1 return -1
endi endi
print =============== step2 print =============== step2 ms db
sql_error insert into $tb values ('2017-08-28 00:23:46.429+ 1a', 2) sql_error insert into $tb values ('2017-08-28 00:23:46.429+ 1a', 2)
sql_error insert into $tb values ('2017-08-28 00:23:46cd .429', 2) sql_error insert into $tb values ('2017-08-28 00:23:46cd .429', 2)
sql select ts from $tb sql select ts from $tb
@ -31,7 +31,7 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step3 print =============== step3 ms db
sql_error insert into $tb values ('1970-01-01 08:00:00.000', 3) sql_error insert into $tb values ('1970-01-01 08:00:00.000', 3)
sql_error insert into $tb values ('1970-01-01 08:00:00.000', 3) sql_error insert into $tb values ('1970-01-01 08:00:00.000', 3)
sql select ts from $tb sql select ts from $tb
@ -39,39 +39,48 @@ if $rows != 1 then
return -1 return -1
endi endi
print =============== step4 print =============== step4 ms db
sql insert into $tb values(now, 4); sql insert into $tb values(now, 4);
sql insert into $tb values(now+1a, 5); sql insert into $tb values(now+1a, 5);
sql insert into $tb values(now+1s, 6); sql insert into $tb values(now+1s, 6);
sql insert into $tb values(now+1m, 7); sql insert into $tb values(now+1m, 7);
sql insert into $tb values(now+1h, 8); sql insert into $tb values(now+1h, 8);
sql insert into $tb values(now+1d, 9); sql insert into $tb values(now+1d, 9);
sql_error insert into $tb values(now+3w, 10); sql insert into $tb values(now+3w, 10);
sql_error insert into $tb values(now+1n, 11); sql insert into $tb values(31556995200000, 11);
sql_error insert into $tb values(now+1y, 12); sql insert into $tb values('2970-01-01 00:00:00.000', 12);
print =============== step5 sql_error insert into $tb values(now+1n, 20);
sql_error insert into $tb values ('9999-12-31 213:59:59.999', 13) sql_error insert into $tb values(now+1y, 21);
sql_error insert into $tb values(31556995200001, 22);
sql_error insert into $tb values('2970-01-02 00:00:00.000', 23);
sql_error insert into $tb values(9223372036854775807, 24);
sql_error insert into $tb values(9223372036854775808, 25);
sql_error insert into $tb values(92233720368547758088, 26);
print =============== step5 ms db
sql_error insert into $tb values ('9999-12-31 213:59:59.999', 27)
sql select ts from $tb sql select ts from $tb
print $rows print $rows
if $rows != 7 then if $rows != 10 then
return -1 return -1
endi endi
print =============== step6 print =============== step6 ms db
sql_error insert into $tb values ('9999-12-99 23:59:59.999', 13) sql_error insert into $tb values ('9999-12-99 23:59:59.999', 28)
sql select ts from $tb sql select ts from $tb
if $rows != 7 then if $rows != 10 then
return -1 return -1
endi endi
print =============== step7 print =============== step7 ms db
$i = 1 $i = 1
$tb = $tbPrefix . $i $tb = $tbPrefix . $i
sql create table $tb (ts timestamp, ts2 timestamp) sql create table $tb (ts timestamp, ts2 timestamp)
print =============== step8 print =============== step8 ms db
sql insert into $tb values (now, now) sql insert into $tb values (now, now)
sql select * from $tb sql select * from $tb
if $rows != 1 then if $rows != 1 then
@ -84,4 +93,131 @@ if $rows != 2 then
return -1 return -1
endi endi
print =============== step20 us db
sql create database $db precision 'us' keep 365000d;
sql create table $tb (ts timestamp, speed int)
sql insert into $tb values ('2017-01-01 08:00:00.001', 1)
sql select ts from $tb
if $rows != 1 then
return -1
endi
if $data00 != @17-01-01 08:00:00.001000@ then
print data00 = $data00
return -1
endi
print =============== step21 us db
sql_error insert into $tb values ('2017-08-28 00:23:46.429+ 1a', 2)
sql_error insert into $tb values ('2017-08-28 00:23:46cd .429', 2)
sql select ts from $tb
if $rows != 1 then
return -1
endi
print =============== step22 us db
sql_error insert into $tb values ('970-01-01 08:00:00.000', 3)
sql_error insert into $tb values ('970-01-01 08:00:00.000', 3)
sql select ts from $tb
if $rows != 1 then
return -1
endi
print =============== step23 us db
sql insert into $tb values(now, 4);
sql insert into $tb values(now+1a, 5);
sql insert into $tb values(now+1s, 6);
sql insert into $tb values(now+1m, 7);
sql insert into $tb values(now+1h, 8);
sql insert into $tb values(now+1d, 9);
sql insert into $tb values(now+3w, 10);
sql insert into $tb values(31556995200000000, 11);
sql insert into $tb values('2970-01-01 00:00:00.000000', 12);
sql_error insert into $tb values(now+1n, 20);
sql_error insert into $tb values(now+1y, 21);
sql_error insert into $tb values(31556995200000001, 22);
sql_error insert into $tb values('2970-01-02 00:00:00.000000', 23);
sql_error insert into $tb values(9223372036854775807, 24);
sql_error insert into $tb values(9223372036854775808, 25);
sql_error insert into $tb values(92233720368547758088, 26);
sql_error insert into $tb values ('9999-12-31 213:59:59.999', 27)
print =============== step24 us db
sql select ts from $tb
print $rows
if $rows != 10 then
return -1
endi
sql drop database $db
sql select * from information_schema.ins_databases
if $rows != 2 then
return -1
endi
print =============== step30 ns db
sql create database $db precision 'ns' keep 36500d;
sql create table $tb (ts timestamp, speed int)
sql insert into $tb values ('2017-01-01 08:00:00.001', 1)
sql select ts from $tb
if $rows != 1 then
return -1
endi
if $data00 != @17-01-01 08:00:00.001000000@ then
print data00 = $data00
return -1
endi
print =============== step31 ns db
sql_error insert into $tb values ('2017-08-28 00:23:46.429+ 1a', 2)
sql_error insert into $tb values ('2017-08-28 00:23:46cd .429', 2)
sql select ts from $tb
if $rows != 1 then
return -1
endi
print =============== step32 ns db
#sql_error insert into $tb values ('970-01-01 08:00:00.000000000', 3)
#sql_error insert into $tb values ('970-01-01 08:00:00.000000000', 3)
sql select ts from $tb
if $rows != 1 then
return -1
endi
print =============== step33 ns db
sql insert into $tb values(now, 4);
sql insert into $tb values(now+1a, 5);
sql insert into $tb values(now+1s, 6);
sql insert into $tb values(now+1m, 7);
sql insert into $tb values(now+1h, 8);
sql insert into $tb values(now+1d, 9);
sql insert into $tb values(now+3w, 10);
sql insert into $tb values(9214646400000000000, 11);
sql insert into $tb values('2262-01-01 00:00:00.000000000', 12);
sql_error insert into $tb values(now+1n, 20);
sql_error insert into $tb values(now+1y, 21);
sql_error insert into $tb values(9214646400000000001, 22);
sql_error insert into $tb values('2262-01-02 00:00:00.000000000', 23);
sql_error insert into $tb values(9223372036854775807, 24);
sql_error insert into $tb values(9223372036854775808, 25);
sql_error insert into $tb values(92233720368547758088, 26);
sql_error insert into $tb values ('9999-12-31 213:59:59.999', 27)
print =============== step34 ns db
sql select ts from $tb
print $rows
if $rows != 10 then
return -1
endi
sql drop database $db
sql select * from information_schema.ins_databases
if $rows != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -2,6 +2,7 @@
import taos import taos
import sys import sys
import time import time
from datetime import datetime
import socket import socket
import os import os
import platform import platform
@ -100,8 +101,11 @@ class TDTestCase:
processName="taosd" processName="taosd"
taosdCmd = taosdCmdRun + startAction taosdCmd = taosdCmdRun + startAction
tdLog.printNoPrefix("%s"%taosdCmd) tdLog.printNoPrefix("%s"%taosdCmd)
os.system(f"nohup {taosdCmd} & ") logTime=datetime.now().strftime('%Y%m%d_%H%M%S_%f')
os.system(f"nohup {taosdCmd} > {logTime}.log 2>&1 & ")
self.checkAndstopPro(processName,startAction) self.checkAndstopPro(processName,startAction)
os.system(f"rm -rf {logTime}.log")
def taosdCommandExe(self,startAction,taosdCmdRun): def taosdCommandExe(self,startAction,taosdCmdRun):
taosdCmd = taosdCmdRun + startAction taosdCmd = taosdCmdRun + startAction
@ -207,7 +211,7 @@ class TDTestCase:
os.system(" mkdir -p taosdCaseTmp ") os.system(" mkdir -p taosdCaseTmp ")
os.system("echo \'TAOS_QUERY_POLICY=3\' > taosdCaseTmp/.env ") os.system("echo \'TAOS_QUERY_POLICY=3\' > taosdCaseTmp/.env ")
self.taosdCommandStop(startAction,taosdCmdRun) self.taosdCommandStop(startAction,taosdCmdRun)
os.system(" rm -rf taosdCaseTmp/.env ") os.system(" rm -rf taosdCaseTmp ")
startAction = " -V" startAction = " -V"
tdLog.printNoPrefix("================================ parameter: %s"%startAction) tdLog.printNoPrefix("================================ parameter: %s"%startAction)

View File

@ -540,11 +540,20 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t
} }
} }
bool shellIsLimitQuery(const char *sql) { // show whole result for this query return true, like limit or describe
// todo refactor bool shellIsShowWhole(const char *sql) {
// limit
if (taosStrCaseStr(sql, " limit ") != NULL) { if (taosStrCaseStr(sql, " limit ") != NULL) {
return true; return true;
} }
// describe
if (taosStrCaseStr(sql, "describe ") != NULL) {
return true;
}
// show
if (taosStrCaseStr(sql, "show ") != NULL) {
return true;
}
return false; return false;
} }
@ -578,7 +587,7 @@ int32_t shellVerticalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX; uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
} }
@ -723,7 +732,7 @@ int32_t shellHorizontalPrintResult(TAOS_RES *tres, const char *sql) {
uint64_t resShowMaxNum = UINT64_MAX; uint64_t resShowMaxNum = UINT64_MAX;
if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsLimitQuery(sql)) { if (shell.args.commands == NULL && shell.args.file[0] == 0 && !shellIsShowWhole(sql)) {
resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM; resShowMaxNum = SHELL_DEFAULT_RES_SHOW_NUM;
} }

View File

@ -1097,18 +1097,12 @@ int sml_time_Test() {
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes); taos_free_result(pRes);
char* tmp = (char*)taosMemoryCalloc(1024, 1); pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
memcpy(tmp, sql[0], strlen(sql[0]));
*(char*)(tmp+44) = 0;
int32_t totalRows = 0;
pRes = taos_schemaless_insert_raw(taos, tmp, strlen(sql[0]), &totalRows, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT(totalRows == 3);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes); int code = taos_errno(pRes);
taos_free_result(pRes); taos_free_result(pRes);
taos_close(taos); taos_close(taos);
taosMemoryFree(tmp);
return code; return code;
} }