Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
832527e5b6
|
@ -76,15 +76,19 @@ IF ("${CPUTYPE}" STREQUAL "")
|
|||
IF (CMAKE_SYSTEM_PROCESSOR MATCHES "(amd64)|(AMD64)")
|
||||
MESSAGE(STATUS "The current platform is amd64")
|
||||
SET(PLATFORM_ARCH_STR "amd64")
|
||||
SET(TD_INTEL_64 TRUE)
|
||||
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "(x86)|(X86)")
|
||||
MESSAGE(STATUS "The current platform is x86")
|
||||
SET(PLATFORM_ARCH_STR "i386")
|
||||
SET(TD_INTEL_32 TRUE)
|
||||
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "armv7l")
|
||||
MESSAGE(STATUS "The current platform is aarch32")
|
||||
SET(PLATFORM_ARCH_STR "arm")
|
||||
SET(TD_ARM_32 TRUE)
|
||||
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64")
|
||||
MESSAGE(STATUS "The current platform is aarch64")
|
||||
SET(PLATFORM_ARCH_STR "arm64")
|
||||
SET(TD_ARM_64 TRUE)
|
||||
ENDIF ()
|
||||
ELSE ()
|
||||
# if generate ARM version:
|
||||
|
@ -92,18 +96,23 @@ ELSE ()
|
|||
IF (${CPUTYPE} MATCHES "aarch32")
|
||||
SET(PLATFORM_ARCH_STR "arm")
|
||||
MESSAGE(STATUS "input cpuType: aarch32")
|
||||
SET(TD_ARM_32 TRUE)
|
||||
ELSEIF (${CPUTYPE} MATCHES "aarch64")
|
||||
SET(PLATFORM_ARCH_STR "arm64")
|
||||
MESSAGE(STATUS "input cpuType: aarch64")
|
||||
SET(TD_ARM_64 TRUE)
|
||||
ELSEIF (${CPUTYPE} MATCHES "mips64")
|
||||
SET(PLATFORM_ARCH_STR "mips")
|
||||
MESSAGE(STATUS "input cpuType: mips64")
|
||||
SET(TD_MIPS_64 TRUE)
|
||||
ELSEIF (${CPUTYPE} MATCHES "x64")
|
||||
SET(PLATFORM_ARCH_STR "amd64")
|
||||
MESSAGE(STATUS "input cpuType: x64")
|
||||
SET(TD_INTEL_64 TRUE)
|
||||
ELSEIF (${CPUTYPE} MATCHES "x86")
|
||||
SET(PLATFORM_ARCH_STR "i386")
|
||||
MESSAGE(STATUS "input cpuType: x86")
|
||||
SET(TD_INTEL_32 TRUE)
|
||||
ELSE ()
|
||||
MESSAGE(STATUS "input cpuType unknown " ${CPUTYPE})
|
||||
ENDIF ()
|
||||
|
|
|
@ -3,9 +3,9 @@ sidebar_label: 时序数据特色查询
|
|||
title: 时序数据特色查询
|
||||
---
|
||||
|
||||
TDengine是专为时序数据而研发的大数据平台,存储和计算都针对时序数据的特定进行了量身定制,在支持标准SQL的基础之上,还提供了一系列贴合时序业务场景的特色查询语法,极大的方便时序场景的应用开发。
|
||||
TDengine 是专为时序数据而研发的大数据平台,存储和计算都针对时序数据的特定进行了量身定制,在支持标准 SQL 的基础之上,还提供了一系列贴合时序业务场景的特色查询语法,极大的方便时序场景的应用开发。
|
||||
|
||||
TDengine提供的特色查询包括标签切分查询和窗口切分查询。
|
||||
TDengine 提供的特色查询包括标签切分查询和窗口切分查询。
|
||||
|
||||
## 标签切分查询
|
||||
|
||||
|
@ -14,13 +14,14 @@ TDengine提供的特色查询包括标签切分查询和窗口切分查询。
|
|||
```sql
|
||||
PARTITION BY tag_list
|
||||
```
|
||||
其中 `tag_list` 是标签列的列表,还可以包括tbname伪列。
|
||||
|
||||
TDengine按如下方式处理标签切分子句:
|
||||
其中 `tag_list` 是标签列的列表,还可以包括 tbname 伪列。
|
||||
|
||||
TDengine 按如下方式处理标签切分子句:
|
||||
|
||||
标签切分子句位于 `WHERE` 子句之后,且不能和 `JOIN` 子句一起使用。
|
||||
标签切分子句将超级表数据按指定的标签组合进行切分,然后对每个切分的分片进行指定的计算。计算由之后的子句定义(窗口子句、`GROUP BY` 子句或`SELECT` 子句)。
|
||||
标签切分子句可以和窗口切分子句(或 `GROUP BY` 子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,下面的示例将数据按标签 `location` 进行分组,并对每个组按10分钟进行降采样,取其最大值。
|
||||
标签切分子句可以和窗口切分子句(或 `GROUP BY` 子句)一起使用,此时后面的子句作用在每个切分的分片上。例如,下面的示例将数据按标签 `location` 进行分组,并对每个组按 10 分钟进行降采样,取其最大值。
|
||||
|
||||
```sql
|
||||
select max(current) from meters partition by location interval(10m)
|
||||
|
@ -42,27 +43,31 @@ SELECT function_list FROM tb_name
|
|||
在上述语法中的具体限制如下
|
||||
|
||||
### 窗口切分查询中使用函数的限制
|
||||
|
||||
- 在聚合查询中,function_list 位置允许使用聚合和选择函数,并要求每个函数仅输出单个结果(例如:COUNT、AVG、SUM、STDDEV、LEASTSQUARES、PERCENTILE、MIN、MAX、FIRST、LAST),而不能使用具有多行输出结果的函数(例如:DIFF 以及四则运算)。
|
||||
- 此外 LAST_ROW 查询也不能与窗口聚合同时出现。
|
||||
- 标量函数(如:CEIL/FLOOR 等)也不能使用在窗口聚合查询中。
|
||||
|
||||
### 窗口子句的规则
|
||||
- 窗口子句位于标签切分子句之后,GROUP BY子句之前,且不可以和GROUP BY子句一起使用。
|
||||
- 窗口子句将数据按窗口进行切分,对每个窗口进行SELECT列表中的表达式的计算,SELECT列表中的表达式只能包含:
|
||||
|
||||
- 窗口子句位于标签切分子句之后,GROUP BY 子句之前,且不可以和 GROUP BY 子句一起使用。
|
||||
- 窗口子句将数据按窗口进行切分,对每个窗口进行 SELECT 列表中的表达式的计算,SELECT 列表中的表达式只能包含:
|
||||
- 常量。
|
||||
- 聚集函数。
|
||||
- 包含上面表达式的表达式。
|
||||
- 窗口子句不可以和GROUP BY子句一起使用。
|
||||
- 窗口子句不可以和 GROUP BY 子句一起使用。
|
||||
- WHERE 语句可以指定查询的起止时间和其他过滤条件。
|
||||
|
||||
### FILL 子句
|
||||
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
|
||||
1. 不进行填充:NONE(默认填充模式)。
|
||||
2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如FILL(VALUE, 1.23),相应列为INT类型,则填充值为1。
|
||||
3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
|
||||
4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
|
||||
5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
|
||||
6. NEXT 填充:使用下一个非 NULL 值填充数据。例如:FILL(NEXT)。
|
||||
|
||||
FILL 语句指定某一窗口区间数据缺失的情况下的填充模式。填充模式包括以下几种:
|
||||
|
||||
1. 不进行填充:NONE(默认填充模式)。
|
||||
2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如:FILL(VALUE, 1.23)。这里需要注意,最终填充的值受由相应列的类型决定,如 FILL(VALUE, 1.23),相应列为 INT 类型,则填充值为 1。
|
||||
3. PREV 填充:使用前一个非 NULL 值填充数据。例如:FILL(PREV)。
|
||||
4. NULL 填充:使用 NULL 填充数据。例如:FILL(NULL)。
|
||||
5. LINEAR 填充:根据前后距离最近的非 NULL 值做线性插值填充。例如:FILL(LINEAR)。
|
||||
6. NEXT 填充:使用下一个非 NULL 值填充数据。例如:FILL(NEXT)。
|
||||
|
||||
:::info
|
||||
|
||||
|
@ -93,6 +98,7 @@ SELECT COUNT(*) FROM temp_tb_1 INTERVAL(1m) SLIDING(2m);
|
|||
```
|
||||
|
||||
使用时间窗口需要注意:
|
||||
|
||||
- 聚合时间段的窗口宽度由关键词 INTERVAL 指定,最短时间间隔 10 毫秒(10a);并且支持偏移 offset(偏移必须小于间隔),也即时间窗口划分与“UTC 时刻 0”相比的偏移量。SLIDING 语句用于指定聚合时间段的前向增量,也即每次窗口向前滑动的时长。
|
||||
- 使用 INTERVAL 语句时,除非极特殊的情况,都要求把客户端和服务端的 taos.cfg 配置文件中的 timezone 参数配置为相同的取值,以避免时间处理函数频繁进行跨时区转换而导致的严重性能影响。
|
||||
- 返回的结果中时间序列严格单调递增。
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
---
|
||||
sidebar_label: Information内置数据库
|
||||
title: Information内置数据库
|
||||
---
|
||||
|
|
@ -80,8 +80,6 @@ typedef struct {
|
|||
SArray* pTableList;
|
||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||
bool needSortTableByGroupId;
|
||||
void* pTagCond;
|
||||
void* pTagIndexCond;
|
||||
uint64_t suid;
|
||||
} STableListInfo;
|
||||
|
||||
|
|
|
@ -2071,9 +2071,10 @@ int32_t tDecodeSVDropTbBatchRsp(SDecoder* pCoder, SVDropTbBatchRsp* pRsp);
|
|||
|
||||
// TDMT_VND_ALTER_TABLE =====================
|
||||
typedef struct {
|
||||
char* tbName;
|
||||
int8_t action;
|
||||
char* colName;
|
||||
char* tbName;
|
||||
int8_t action;
|
||||
char* colName;
|
||||
int32_t colId;
|
||||
// TSDB_ALTER_TABLE_ADD_COLUMN
|
||||
int8_t type;
|
||||
int8_t flags;
|
||||
|
|
|
@ -63,12 +63,13 @@ static FORCE_INLINE int64_t taosGetTimestampToday(int32_t precision) {
|
|||
: (precision == TSDB_TIME_PRECISION_MICRO) ? 1000000
|
||||
: 1000000000;
|
||||
time_t t = taosTime(NULL);
|
||||
struct tm* tm = taosLocalTime(&t, NULL);
|
||||
tm->tm_hour = 0;
|
||||
tm->tm_min = 0;
|
||||
tm->tm_sec = 0;
|
||||
struct tm tm;
|
||||
taosLocalTime(&t, &tm);
|
||||
tm.tm_hour = 0;
|
||||
tm.tm_min = 0;
|
||||
tm.tm_sec = 0;
|
||||
|
||||
return (int64_t)taosMktime(tm) * factor;
|
||||
return (int64_t)taosMktime(&tm) * factor;
|
||||
}
|
||||
|
||||
int64_t taosTimeAdd(int64_t t, int64_t duration, char unit, int32_t precision);
|
||||
|
|
|
@ -26,6 +26,12 @@ extern "C" {
|
|||
|
||||
#define SLOT_NAME_LEN TSDB_TABLE_NAME_LEN + TSDB_COL_NAME_LEN
|
||||
|
||||
typedef enum EDataOrderLevel {
|
||||
DATA_ORDER_LEVEL_NONE = 1,
|
||||
DATA_ORDER_LEVEL_IN_BLOCK,
|
||||
DATA_ORDER_LEVEL_IN_GROUP
|
||||
} EDataOrderLevel;
|
||||
|
||||
typedef struct SLogicNode {
|
||||
ENodeType type;
|
||||
SNodeList* pTargets; // SColumnNode
|
||||
|
@ -36,6 +42,8 @@ typedef struct SLogicNode {
|
|||
uint8_t precision;
|
||||
SNode* pLimit;
|
||||
SNode* pSlimit;
|
||||
EDataOrderLevel requireDataOrder; // requirements for input data
|
||||
EDataOrderLevel resultDataOrder; // properties of the output data
|
||||
} SLogicNode;
|
||||
|
||||
typedef enum EScanType {
|
||||
|
@ -78,7 +86,7 @@ typedef struct SScanLogicNode {
|
|||
SNodeList* pGroupTags;
|
||||
bool groupSort;
|
||||
int8_t cacheLastMode;
|
||||
bool hasNormalCols; // neither tag column nor primary key tag column
|
||||
bool hasNormalCols; // neither tag column nor primary key tag column
|
||||
} SScanLogicNode;
|
||||
|
||||
typedef struct SJoinLogicNode {
|
||||
|
@ -317,6 +325,7 @@ typedef STableScanPhysiNode SStreamScanPhysiNode;
|
|||
typedef struct SProjectPhysiNode {
|
||||
SPhysiNode node;
|
||||
SNodeList* pProjections;
|
||||
bool mergeDataBlock;
|
||||
} SProjectPhysiNode;
|
||||
|
||||
typedef struct SIndefRowsFuncPhysiNode {
|
||||
|
|
|
@ -116,6 +116,13 @@ int32_t csumScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam
|
|||
int32_t diffScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t stateCountScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1609,11 +1609,13 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
if (pResultInfo->convertJson == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
char* p1 = pResultInfo->convertJson;
|
||||
|
||||
int32_t totalLen = 0;
|
||||
int32_t len = sizeof(int32_t) + sizeof(uint64_t) + numOfCols * (sizeof(int16_t) + sizeof(int32_t));
|
||||
memcpy(p1, p, len);
|
||||
|
||||
p += len;
|
||||
p1 += len;
|
||||
totalLen += len;
|
||||
|
||||
len = sizeof(int32_t) * numOfCols;
|
||||
int32_t* colLength = (int32_t*)p;
|
||||
|
@ -1621,6 +1623,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
memcpy(p1, p, len);
|
||||
p += len;
|
||||
p1 += len;
|
||||
totalLen += len;
|
||||
|
||||
char* pStart = p;
|
||||
char* pStart1 = p1;
|
||||
|
@ -1636,6 +1639,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
totalLen += len;
|
||||
|
||||
len = 0;
|
||||
for (int32_t j = 0; j < numOfRows; ++j) {
|
||||
|
@ -1680,24 +1684,30 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
|
|||
len += varDataTLen(dst);
|
||||
}
|
||||
colLen1 = len;
|
||||
totalLen += colLen1;
|
||||
colLength1[i] = htonl(len);
|
||||
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
|
||||
len = numOfRows * sizeof(int32_t);
|
||||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
totalLen += len;
|
||||
totalLen += colLen;
|
||||
memcpy(pStart1, pStart, colLen);
|
||||
} else {
|
||||
len = BitmapLen(pResultInfo->numOfRows);
|
||||
memcpy(pStart1, pStart, len);
|
||||
pStart += len;
|
||||
pStart1 += len;
|
||||
totalLen += len;
|
||||
totalLen += colLen;
|
||||
memcpy(pStart1, pStart, colLen);
|
||||
}
|
||||
pStart += colLen;
|
||||
pStart1 += colLen1;
|
||||
}
|
||||
|
||||
*(int32_t*)(pResultInfo->convertJson) = totalLen;
|
||||
pResultInfo->pData = pResultInfo->convertJson;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -609,7 +609,7 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
|
|||
}
|
||||
kvVal->type = TSDB_DATA_TYPE_BIGINT;
|
||||
kvVal->i = (int64_t)result;
|
||||
} else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
|
||||
} else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
|
||||
if (result >= (double)UINT64_MAX || result < 0) {
|
||||
errno = 0;
|
||||
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
|
||||
|
@ -1047,6 +1047,10 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
|
|||
continue;
|
||||
}
|
||||
|
||||
if(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE){
|
||||
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
|
||||
}
|
||||
|
||||
// add kv to SSmlKv
|
||||
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
|
||||
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -363,7 +363,11 @@ tmq_list_t* tmq_list_new() {
|
|||
|
||||
int32_t tmq_list_append(tmq_list_t* list, const char* src) {
|
||||
SArray* container = &list->container;
|
||||
char* topic = strdup(src);
|
||||
if (src == NULL || src[0] == 0) return -1;
|
||||
char* topic = strdup(src);
|
||||
if (topic[0] != '`') {
|
||||
strtolower(topic, src);
|
||||
}
|
||||
if (taosArrayPush(container, &topic) == NULL) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -499,6 +499,32 @@ TEST(testCase, smlGetTimestampLen_Test) {
|
|||
ASSERT_EQ(len, 3);
|
||||
}
|
||||
|
||||
TEST(testCase, smlParseNumber_Test) {
|
||||
SSmlKv kv = {0};
|
||||
char buf[64] = {0};
|
||||
SSmlMsgBuf msg = {0};
|
||||
msg.buf = buf;
|
||||
msg.len = 64;
|
||||
kv.value = "3.2e-900";
|
||||
kv.length = 8;
|
||||
bool res = smlParseNumber(&kv, &msg);
|
||||
printf("res:%d,v:%f, %f\n", res,kv.d, HUGE_VAL);
|
||||
}
|
||||
|
||||
//#include <stdlib.h>
|
||||
//TEST(testCase, number_Test) {
|
||||
// char *str[] = {
|
||||
//// "-000 0999",
|
||||
// "- abc",
|
||||
// };
|
||||
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
|
||||
// errno = 0;
|
||||
// char *end = NULL;
|
||||
// long result = strtol(str[i], &end, 10);
|
||||
// printf("errno:%d,len:%d,result:%ld\n", errno, end - str[i], result);
|
||||
// }
|
||||
//
|
||||
//}
|
||||
/*
|
||||
TEST(testCase, smlProcess_influx_Test) {
|
||||
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
|
|
|
@ -1661,9 +1661,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
}
|
||||
*/
|
||||
|
||||
#ifdef WINDOWS
|
||||
if (tt < 0) tt = 0;
|
||||
#endif
|
||||
if (tt <= 0 && ms < 0) {
|
||||
tt--;
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
|
@ -1674,9 +1671,9 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
|
|||
ms += 1000;
|
||||
}
|
||||
}
|
||||
|
||||
struct tm* ptm = taosLocalTime(&tt, NULL);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
struct tm ptm = {0};
|
||||
taosLocalTime(&tt, &ptm);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", ms);
|
||||
|
|
|
@ -902,7 +902,7 @@ const char* fmtts(int64_t ts) {
|
|||
|
||||
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision) {
|
||||
char ts[40] = {0};
|
||||
struct tm* ptm;
|
||||
struct tm ptm;
|
||||
|
||||
int32_t fractionLen;
|
||||
char* format = NULL;
|
||||
|
@ -939,10 +939,10 @@ void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precision)
|
|||
assert(false);
|
||||
}
|
||||
|
||||
ptm = taosLocalTime(", NULL);
|
||||
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", ptm);
|
||||
taosLocalTime(", &ptm);
|
||||
int32_t length = (int32_t)strftime(ts, 40, "%Y-%m-%dT%H:%M:%S", &ptm);
|
||||
length += snprintf(ts + length, fractionLen, format, mod);
|
||||
length += (int32_t)strftime(ts + length, 40 - length, "%z", ptm);
|
||||
length += (int32_t)strftime(ts + length, 40 - length, "%z", &ptm);
|
||||
|
||||
tstrncpy(buf, ts, bufLen);
|
||||
}
|
||||
|
|
|
@ -298,7 +298,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId);
|
||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
imbCnt++;
|
||||
}
|
||||
|
@ -312,7 +313,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
.pVgEp = pVgEp,
|
||||
};
|
||||
taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg));
|
||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId, pConsumerEp->consumerId);
|
||||
mInfo("mq rebalance: remove vgId:%d from consumer:%" PRId64 ",(first scan)", pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -354,7 +356,8 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -371,8 +374,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
ASSERT(pConsumerEp->consumerId > 0);
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
if (pRebVg->newConsumerId == pRebVg->oldConsumerId) {
|
||||
mInfo("mq rebalance: skip vg %d for same consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
continue;
|
||||
}
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId, pConsumerEp->consumerId);
|
||||
mInfo("mq rebalance: add vgId:%d to consumer:%" PRId64 ",(second scan)", pRebVg->pVgEp->vgId,
|
||||
pConsumerEp->consumerId);
|
||||
}
|
||||
} else {
|
||||
// if all consumer is removed, put all vg into unassigned
|
||||
|
|
|
@ -205,7 +205,7 @@ _query:
|
|||
}
|
||||
|
||||
tDecoderInit(&dc, pData, nData);
|
||||
tDecodeSSchemaWrapper(&dc, &schema);
|
||||
tDecodeSSchemaWrapperEx(&dc, &schema);
|
||||
pSchema = tCloneSSchemaWrapper(&schema);
|
||||
tDecoderClear(&dc);
|
||||
|
||||
|
@ -470,9 +470,9 @@ int64_t metaGetTbNum(SMeta *pMeta) {
|
|||
}
|
||||
|
||||
// N.B. Called by statusReq per second
|
||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
||||
int64_t metaGetTimeSeriesNum(SMeta *pMeta) {
|
||||
// TODO
|
||||
return 400;
|
||||
return 400;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -64,9 +64,16 @@ int64_t tqScan(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVa
|
|||
qTaskInfo_t task = pExec->execCol.task[0];
|
||||
|
||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||
ASSERT(pOffset->type == TMQ_OFFSET__LOG);
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
if (pOffset->type == TMQ_OFFSET__LOG) {
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
} else {
|
||||
tqOffsetResetToLog(pOffset, pHandle->snapshotVer + 1);
|
||||
if (qStreamPrepareScan(task, pOffset) < 0) {
|
||||
pRsp->rspOffset = *pOffset;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
|
|
|
@ -243,7 +243,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
|||
void* data = taosMemoryMalloc(msgLen);
|
||||
if (data == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
qError("failed to copy data for stream since out of memory");
|
||||
tqError("failed to copy data for stream since out of memory");
|
||||
return -1;
|
||||
}
|
||||
memcpy(data, msg, msgLen);
|
||||
|
|
|
@ -630,6 +630,9 @@ _exit:
|
|||
tEncoderInit(&ec, pRsp->pCont, pRsp->contLen);
|
||||
tEncodeSVAlterTbRsp(&ec, &vAlterTbRsp);
|
||||
tEncoderClear(&ec);
|
||||
if (vMetaRsp.pSchemas) {
|
||||
taosMemoryFree(vMetaRsp.pSchemas);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#ifndef TDENGINE_QUERYUTIL_H
|
||||
#define TDENGINE_QUERYUTIL_H
|
||||
|
||||
#include "vnode.h"
|
||||
#include "function.h"
|
||||
#include "nodes.h"
|
||||
#include "plannodes.h"
|
||||
|
@ -96,9 +97,9 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
|
|||
}
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
bool hasDataInGroupInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
||||
|
@ -106,7 +107,7 @@ int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
|||
SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode);
|
||||
|
||||
EDealRes doTranslateTagExpr(SNode** pNode, void* pContext);
|
||||
int32_t getTableList(void* metaHandle, void* vnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo);
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo);
|
||||
SArray* createSortInfo(SNodeList* pNodeList);
|
||||
SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||
|
@ -128,4 +129,6 @@ int32_t convertFillType(int32_t mode);
|
|||
|
||||
int32_t resultrowComparAsc(const void* p1, const void* p2);
|
||||
|
||||
int32_t isTableOk(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -311,19 +311,10 @@ typedef struct STableScanInfo {
|
|||
int32_t dataBlockLoadFlag;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
|
||||
int32_t currentGroupId;
|
||||
int32_t currentTable;
|
||||
|
||||
#if 0
|
||||
struct {
|
||||
uint64_t uid;
|
||||
int64_t ts;
|
||||
} lastStatus;
|
||||
#endif
|
||||
|
||||
int8_t scanMode;
|
||||
int8_t noTable;
|
||||
int8_t scanMode;
|
||||
int8_t noTable;
|
||||
} STableScanInfo;
|
||||
|
||||
typedef struct STagScanInfo {
|
||||
|
@ -429,8 +420,9 @@ typedef struct SStreamScanInfo {
|
|||
|
||||
// status for tmq
|
||||
// SSchemaWrapper schema;
|
||||
STqOffset offset;
|
||||
|
||||
STqOffset offset;
|
||||
SNode* pTagCond;
|
||||
SNode* pTagIndexCond;
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct SSysTableScanInfo {
|
||||
|
@ -874,8 +866,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx
|
|||
SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* readHandle, uint64_t uid, SBlockDistScanPhysiNode* pBlockScanNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle,
|
||||
STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId);
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup);
|
||||
|
||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
|
@ -966,12 +958,12 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
|
|||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
int32_t createScanTableListInfo(SScanPhysiNode *pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId);
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idstr);
|
||||
|
||||
SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo *pTableListInfo,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId, uint64_t taskId);
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo);
|
||||
|
||||
void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex);
|
||||
|
||||
|
|
|
@ -72,8 +72,12 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||
assert(pGroupResInfo != NULL);
|
||||
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
pGroupResInfo->pRows = NULL;
|
||||
for(int32_t i = 0; i < taosArrayGetSize(pGroupResInfo->pRows); ++i) {
|
||||
SResKeyPos* pRes = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
taosMemoryFree(pRes);
|
||||
}
|
||||
|
||||
pGroupResInfo->pRows = taosArrayDestroy(pGroupResInfo->pRows);
|
||||
pGroupResInfo->index = 0;
|
||||
}
|
||||
|
||||
|
@ -261,7 +265,7 @@ EDealRes doTranslateTagExpr(SNode** pNode, void* pContext) {
|
|||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
|
||||
int32_t isTableOk(STableKeyInfo* info, SNode* pTagCond, void* metaHandle, bool* pQualified) {
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, metaHandle, 0);
|
||||
metaGetTableEntryByUid(&mr, info->uid);
|
||||
|
@ -276,19 +280,22 @@ static bool isTableOk(STableKeyInfo* info, SNode* pTagCond, SMeta* metaHandle) {
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
terrno = code;
|
||||
nodesDestroyNode(pTagCondTmp);
|
||||
return false;
|
||||
*pQualified = false;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
ASSERT(nodeType(pNew) == QUERY_NODE_VALUE);
|
||||
SValueNode* pValue = (SValueNode*)pNew;
|
||||
|
||||
ASSERT(pValue->node.resType.type == TSDB_DATA_TYPE_BOOL);
|
||||
bool result = pValue->datum.b;
|
||||
*pQualified = pValue->datum.b;
|
||||
|
||||
nodesDestroyNode(pNew);
|
||||
return result;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, STableListInfo* pListInfo) {
|
||||
int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond, STableListInfo* pListInfo) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
||||
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||
|
@ -300,8 +307,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
|
||||
pListInfo->suid = pScanNode->suid;
|
||||
|
||||
SNode* pTagCond = (SNode*)pListInfo->pTagCond;
|
||||
SNode* pTagIndexCond = (SNode*)pListInfo->pTagIndexCond;
|
||||
if (pScanNode->tableType == TSDB_SUPER_TABLE) {
|
||||
if (pTagIndexCond) {
|
||||
SIndexMetaArg metaArg = {
|
||||
|
@ -341,9 +346,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
int32_t i = 0;
|
||||
while (i < taosArrayGetSize(pListInfo->pTableList)) {
|
||||
STableKeyInfo* info = taosArrayGet(pListInfo->pTableList, i);
|
||||
bool isOk = isTableOk(info, pTagCond, metaHandle);
|
||||
if (terrno) return terrno;
|
||||
if (!isOk) {
|
||||
|
||||
bool qualified = true;
|
||||
code = isTableOk(info, pTagCond, metaHandle, &qualified);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (!qualified) {
|
||||
taosArrayRemove(pListInfo->pTableList, i);
|
||||
continue;
|
||||
}
|
||||
|
@ -358,7 +368,6 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
|||
|
||||
// put into list as default group, remove it if grouping sorting is required later
|
||||
taosArrayPush(pListInfo->pGroupList, &pListInfo->pTableList);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -153,7 +153,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers) {
|
|||
return pTaskInfo;
|
||||
}
|
||||
|
||||
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) {
|
||||
static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList, const char* idstr) {
|
||||
SArray* qa = taosArrayInit(4, sizeof(tb_uid_t));
|
||||
|
||||
// let's discard the tables those are not created according to the queried super table.
|
||||
|
@ -164,7 +164,7 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
|
|||
|
||||
int32_t code = metaGetTableEntryByUid(&mr, *id);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno));
|
||||
qError("failed to get table meta, uid:%" PRIu64 " code:%s, %s", *id, tstrerror(terrno), idstr);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -172,6 +172,21 @@ static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, cons
|
|||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != pScanInfo->tableUid) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pScanInfo->pTagCond != NULL) {
|
||||
bool qualified = false;
|
||||
STableKeyInfo info = {.groupId = 0, .uid = mr.me.uid, .lastKey = 0};
|
||||
code = isTableOk(&info, pScanInfo->pTagCond, pScanInfo->readHandle.meta, &qualified);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("failed to filter new table, uid:0x%" PRIx64 ", %s", info.uid, idstr);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!qualified) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
/*pScanInfo->pStreamScanOp->pTaskInfo->tableqinfoList.*/
|
||||
// handle multiple partition
|
||||
|
||||
|
@ -194,7 +209,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
int32_t code = 0;
|
||||
SStreamScanInfo* pScanInfo = pInfo->info;
|
||||
if (isAdd) { // add new table id
|
||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList);
|
||||
SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo));
|
||||
|
||||
qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa));
|
||||
code = tqReaderAddTbUidList(pScanInfo->tqReader, qa);
|
||||
|
|
|
@ -332,6 +332,8 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
|
|||
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
|
||||
uid = pTableInfo->uid;
|
||||
ts = INT64_MIN;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <executorimpl.h>
|
||||
#include "filter.h"
|
||||
#include "function.h"
|
||||
#include "functionMgt.h"
|
||||
|
@ -377,6 +376,10 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
|||
colDataAppendInt64(pColData, 4, &pQueryWindow->ekey);
|
||||
}
|
||||
|
||||
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) {
|
||||
colDataDestroy(pColData);
|
||||
}
|
||||
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
|
||||
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
|
@ -3737,7 +3740,7 @@ void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
cleanupBasicInfo(&pInfo->binfo);
|
||||
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
taosArrayDestroyEx(pInfo->groupResInfo.pRows, freeItem);
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -4365,8 +4368,7 @@ static int32_t initTableblockDistQueryCond(uint64_t uid, SQueryTableDataCond* pC
|
|||
}
|
||||
|
||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||
uint64_t queryId, uint64_t taskId, STableListInfo* pTableListInfo,
|
||||
const char* pUser) {
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser) {
|
||||
int32_t type = nodeType(pPhyNode);
|
||||
|
||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||
|
@ -4374,7 +4376,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
@ -4394,7 +4396,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
@ -4407,7 +4409,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
}
|
||||
|
||||
SOperatorInfo* pOperator =
|
||||
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo, queryId, taskId);
|
||||
createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
|
||||
|
||||
STableScanInfo* pScanInfo = pOperator->info;
|
||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||
|
@ -4424,15 +4426,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
};
|
||||
if (pHandle) {
|
||||
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, queryId, taskId);
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
SOperatorInfo* pOperator =
|
||||
createStreamScanOperatorInfo(pHandle, pTableScanNode, pTaskInfo, &twSup, queryId, taskId);
|
||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo, &twSup);
|
||||
return pOperator;
|
||||
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||
|
@ -4441,7 +4442,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTableListInfo);
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
|
@ -4477,8 +4478,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
||||
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo,
|
||||
queryId, taskId);
|
||||
int32_t code = createScanTableListInfo(&pScanNode->scan, pScanNode->pGroupTags, true, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
return NULL;
|
||||
|
@ -4502,7 +4502,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableListInfo, pUser);
|
||||
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
|
||||
if (ops[i] == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -4685,6 +4685,7 @@ SArray* extractColumnInfo(SNodeList* pNodeList) {
|
|||
return pList;
|
||||
}
|
||||
|
||||
#if 0
|
||||
STsdbReader* doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, const char* idstr) {
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, &pTableScanNode->scan, pTableListInfo);
|
||||
|
@ -4718,6 +4719,7 @@ _error:
|
|||
terrno = code;
|
||||
return NULL;
|
||||
}
|
||||
#endif
|
||||
|
||||
static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanInfo** ppInfo) {
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
|
@ -4761,6 +4763,7 @@ int32_t extractTableScanNode(SPhysiNode* pNode, STableScanPhysiNode** ppNode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHandle, int64_t uid, int64_t ts) {
|
||||
STableScanInfo* pTableScanInfo = NULL;
|
||||
if (extractTbscanInStreamOpTree(pOperator, &pTableScanInfo) < 0) {
|
||||
|
@ -4784,6 +4787,7 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa
|
|||
// TODO: set uid and ts to data reader
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) {
|
||||
int32_t code = TDB_CODE_SUCCESS;
|
||||
|
@ -4935,10 +4939,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
|||
}
|
||||
|
||||
(*pTaskInfo)->sql = sql;
|
||||
(*pTaskInfo)->tableqinfoList.pTagCond = pPlan->pTagCond;
|
||||
(*pTaskInfo)->tableqinfoList.pTagIndexCond = pPlan->pTagIndexCond;
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId,
|
||||
&(*pTaskInfo)->tableqinfoList, pPlan->user);
|
||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, &(*pTaskInfo)->tableqinfoList, pPlan->pTagCond, pPlan->pTagIndexCond, pPlan->user);
|
||||
|
||||
if (NULL == (*pTaskInfo)->pRoot) {
|
||||
code = (*pTaskInfo)->code;
|
||||
|
|
|
@ -1497,9 +1497,8 @@ static void destroyStreamScanOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFree(pStreamScan);
|
||||
}
|
||||
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond,
|
||||
SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup) {
|
||||
SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
|
||||
|
@ -1512,6 +1511,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
|
||||
SDataBlockDescNode* pDescNode = pScanPhyNode->node.pOutputDataBlockDesc;
|
||||
|
||||
pInfo->pTagCond = pTagCond;
|
||||
|
||||
int32_t numOfCols = 0;
|
||||
pInfo->pColMatchInfo = extractColMatchInfo(pScanPhyNode->pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||
|
||||
|
@ -2550,26 +2551,19 @@ typedef struct STableMergeScanInfo {
|
|||
int32_t tableEndIndex;
|
||||
bool hasGroupId;
|
||||
uint64_t groupId;
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SReadHandle readHandle;
|
||||
int32_t bufPageSize;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
|
||||
SArray* dataReaders; // array of tsdbReaderT*
|
||||
SReadHandle readHandle;
|
||||
|
||||
int32_t bufPageSize;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
|
||||
SSDataBlock* pSortInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
|
||||
SArray* sortSourceParams;
|
||||
uint64_t queryId;
|
||||
uint64_t taskId;
|
||||
SSDataBlock* pSortInputBlock;
|
||||
int64_t startTs; // sort start time
|
||||
SArray* sortSourceParams;
|
||||
|
||||
SFileBlockLoadRecorder readRecorder;
|
||||
int64_t numOfRows;
|
||||
// int32_t prevGroupId; // previous table group id
|
||||
int64_t numOfRows;
|
||||
SScanInfo scanInfo;
|
||||
int32_t scanTimes;
|
||||
SNode* pFilterNode; // filter info, which is push down by optimizer
|
||||
|
@ -2584,26 +2578,25 @@ typedef struct STableMergeScanInfo {
|
|||
SExprInfo* pPseudoExpr;
|
||||
int32_t numOfPseudoExpr;
|
||||
SqlFunctionCtx* pPseudoCtx;
|
||||
// int32_t* rowEntryInfoOffset;
|
||||
|
||||
SQueryTableDataCond cond;
|
||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||
int32_t dataBlockLoadFlag;
|
||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||
// window to check if current data block needs to be loaded.
|
||||
|
||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||
int32_t dataBlockLoadFlag;
|
||||
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||
// window to check if current data block needs to be loaded.
|
||||
SInterval interval;
|
||||
SSampleExecInfo sample; // sample execution info
|
||||
} STableMergeScanInfo;
|
||||
|
||||
int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags, bool groupSort, SReadHandle* pHandle,
|
||||
STableListInfo* pTableListInfo, uint64_t queryId, uint64_t taskId) {
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTableListInfo);
|
||||
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, const char* idStr) {
|
||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanNode, pTagCond, pTagIndexCond, pTableListInfo);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pTableListInfo->pTableList) == 0) {
|
||||
qDebug("no table qualified for query, TID:0x%" PRIx64 ", QID:0x%" PRIx64, taskId, queryId);
|
||||
qDebug("no table qualified for query, %s" PRIx64, idStr);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3027,8 +3020,7 @@ int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
|||
}
|
||||
|
||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
|
||||
uint64_t taskId) {
|
||||
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo) {
|
||||
STableMergeScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STableMergeScanInfo));
|
||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||
if (pInfo == NULL || pOperator == NULL) {
|
||||
|
@ -3067,9 +3059,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|||
|
||||
pInfo->pResBlock = createResDataBlock(pDescNode);
|
||||
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
||||
pInfo->queryId = queryId;
|
||||
pInfo->taskId = taskId;
|
||||
|
||||
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
||||
|
||||
pInfo->pSortInfo = generateSortByTsInfo(pInfo->pColMatchInfo, pInfo->cond.order);
|
||||
|
|
|
@ -63,7 +63,6 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo
|
|||
SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup,
|
||||
SExecTaskInfo* pTaskInfo) {
|
||||
assert(win->skey <= win->ekey);
|
||||
SResultRow* pResultRow = doSetResultOutBufByKey(pAggSup->pResultBuf, pResultRowInfo, (char*)&win->skey, TSDB_KEYSIZE,
|
||||
masterscan, tableGroupId, pTaskInfo, true, pAggSup);
|
||||
|
||||
|
@ -1513,12 +1512,25 @@ static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
static void freeItem(void* param) {
|
||||
SGroupKeys *pKey = (SGroupKeys*) param;
|
||||
taosMemoryFree(pKey->pData);
|
||||
}
|
||||
|
||||
void destroyIntervalOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SIntervalAggOperatorInfo* pInfo = (SIntervalAggOperatorInfo*)param;
|
||||
cleanupBasicInfo(&pInfo->binfo);
|
||||
cleanupAggSup(&pInfo->aggSup);
|
||||
taosArrayDestroy(pInfo->pRecycledPages);
|
||||
pInfo->pRecycledPages = taosArrayDestroy(pInfo->pRecycledPages);
|
||||
pInfo->pInterpCols = taosArrayDestroy(pInfo->pInterpCols);
|
||||
taosArrayDestroyEx(pInfo->pPrevValues, freeItem);
|
||||
|
||||
pInfo->pPrevValues = NULL;
|
||||
pInfo->pDelWins = taosArrayDestroy(pInfo->pDelWins);
|
||||
pInfo->pDelRes = blockDataDestroy(pInfo->pDelRes);
|
||||
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
|
|
@ -194,8 +194,9 @@ static bool validateTimezoneFormat(const SValueNode* pVal) {
|
|||
void static addTimezoneParam(SNodeList* pList) {
|
||||
char buf[6] = {0};
|
||||
time_t t = taosTime(NULL);
|
||||
struct tm* tmInfo = taosLocalTime(&t, NULL);
|
||||
strftime(buf, sizeof(buf), "%z", tmInfo);
|
||||
struct tm tmInfo;
|
||||
taosLocalTime(&t, &tmInfo);
|
||||
strftime(buf, sizeof(buf), "%z", &tmInfo);
|
||||
int32_t len = (int32_t)strlen(buf);
|
||||
|
||||
SValueNode* pVal = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||
|
@ -2086,6 +2087,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = topFunction,
|
||||
.sprocessFunc = topBotScalarFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = topCombine,
|
||||
.pPartialFunc = "top",
|
||||
|
@ -2100,6 +2102,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getTopBotFuncEnv,
|
||||
.initFunc = topBotFunctionSetup,
|
||||
.processFunc = bottomFunction,
|
||||
.sprocessFunc = topBotScalarFunction,
|
||||
.finalizeFunc = topBotFinalize,
|
||||
.combineFunc = bottomCombine,
|
||||
.pPartialFunc = "bottom",
|
||||
|
@ -2229,6 +2232,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastRowFunction,
|
||||
.sprocessFunc = firstLastScalarFunction,
|
||||
.finalizeFunc = firstLastFinalize
|
||||
},
|
||||
{
|
||||
|
@ -2249,6 +2253,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = firstFunction,
|
||||
.sprocessFunc = firstLastScalarFunction,
|
||||
.finalizeFunc = firstLastFinalize,
|
||||
.pPartialFunc = "_first_partial",
|
||||
.pMergeFunc = "_first_merge",
|
||||
|
@ -2284,6 +2289,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getFirstLastFuncEnv,
|
||||
.initFunc = functionSetup,
|
||||
.processFunc = lastFunction,
|
||||
.sprocessFunc = firstLastScalarFunction,
|
||||
.finalizeFunc = firstLastFinalize,
|
||||
.pPartialFunc = "_last_partial",
|
||||
.pMergeFunc = "_last_merge",
|
||||
|
@ -2331,6 +2337,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getHistogramFuncEnv,
|
||||
.initFunc = histogramFunctionSetup,
|
||||
.processFunc = histogramFunction,
|
||||
.sprocessFunc = histogramScalarFunction,
|
||||
.finalizeFunc = histogramFinalize,
|
||||
.invertFunc = NULL,
|
||||
.combineFunc = histogramCombine,
|
||||
|
@ -2463,6 +2470,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getSampleFuncEnv,
|
||||
.initFunc = sampleFunctionSetup,
|
||||
.processFunc = sampleFunction,
|
||||
.sprocessFunc = sampleScalarFunction,
|
||||
.finalizeFunc = sampleFinalize
|
||||
},
|
||||
{
|
||||
|
@ -2474,6 +2482,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getTailFuncEnv,
|
||||
.initFunc = tailFunctionSetup,
|
||||
.processFunc = tailFunction,
|
||||
.sprocessFunc = tailScalarFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
|
@ -2485,6 +2494,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getUniqueFuncEnv,
|
||||
.initFunc = uniqueFunctionSetup,
|
||||
.processFunc = uniqueFunction,
|
||||
.sprocessFunc = uniqueScalarFunction,
|
||||
.finalizeFunc = NULL
|
||||
},
|
||||
{
|
||||
|
@ -2495,6 +2505,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
|||
.getEnvFunc = getModeFuncEnv,
|
||||
.initFunc = modeFunctionSetup,
|
||||
.processFunc = modeFunction,
|
||||
.sprocessFunc = modeScalarFunction,
|
||||
.finalizeFunc = modeFinalize,
|
||||
},
|
||||
{
|
||||
|
|
|
@ -548,37 +548,12 @@ static const char* jkScanLogicPlanScanPseudoCols = "ScanPseudoCols";
|
|||
static const char* jkScanLogicPlanTableType = "TableType";
|
||||
static const char* jkScanLogicPlanTableId = "TableId";
|
||||
static const char* jkScanLogicPlanStableId = "StableId";
|
||||
static const char* jkScanLogicPlanScanType = "ScanType";
|
||||
static const char* jkScanLogicPlanScanCount = "ScanCount";
|
||||
static const char* jkScanLogicPlanReverseScanCount = "ReverseScanCount";
|
||||
static const char* jkScanLogicPlanTagCond = "TagCond";
|
||||
static const char* jkScanLogicPlanGroupTags = "GroupTags";
|
||||
|
||||
// typedef struct SScanLogicNode {
|
||||
// uint64_t stableId;
|
||||
// SVgroupsInfo* pVgroupList;
|
||||
// EScanType scanType;
|
||||
// uint8_t scanSeq[2]; // first is scan count, and second is reverse scan count
|
||||
// STimeWindow scanRange;
|
||||
// SName tableName;
|
||||
// bool showRewrite;
|
||||
// double ratio;
|
||||
// SNodeList* pDynamicScanFuncs;
|
||||
// int32_t dataRequired;
|
||||
// int64_t interval;
|
||||
// int64_t offset;
|
||||
// int64_t sliding;
|
||||
// int8_t intervalUnit;
|
||||
// int8_t slidingUnit;
|
||||
// SNode* pTagCond;
|
||||
// SNode* pTagIndexCond;
|
||||
// int8_t triggerType;
|
||||
// int64_t watermark;
|
||||
// int8_t igExpired;
|
||||
// SArray* pSmaIndexes;
|
||||
// SNodeList* pGroupTags;
|
||||
// bool groupSort;
|
||||
// } SScanLogicNode;
|
||||
|
||||
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
|
||||
|
||||
|
@ -598,6 +573,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanStableId, pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanType, pNode->scanType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkScanLogicPlanScanCount, pNode->scanSeq[0]);
|
||||
}
|
||||
|
@ -634,6 +612,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUBigIntValue(pJson, jkScanLogicPlanStableId, &pNode->stableId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkScanLogicPlanScanType, pNode->scanType, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetUTinyIntValue(pJson, jkScanLogicPlanScanCount, &pNode->scanSeq[0]);
|
||||
}
|
||||
|
@ -1677,6 +1658,7 @@ static int32_t jsonToPhysiSysTableScanNode(const SJson* pJson, void* pObj) {
|
|||
}
|
||||
|
||||
static const char* jkProjectPhysiPlanProjections = "Projections";
|
||||
static const char* jkProjectPhysiPlanMergeDataBlock = "MergeDataBlock";
|
||||
|
||||
static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SProjectPhysiNode* pNode = (const SProjectPhysiNode*)pObj;
|
||||
|
@ -1685,6 +1667,9 @@ static int32_t physiProjectNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkProjectPhysiPlanProjections, pNode->pProjections);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkProjectPhysiPlanMergeDataBlock, pNode->mergeDataBlock);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -1696,6 +1681,9 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkProjectPhysiPlanProjections, &pNode->pProjections);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkProjectPhysiPlanMergeDataBlock, &pNode->mergeDataBlock);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -201,7 +201,8 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
|
|||
}
|
||||
|
||||
static bool isUselessCol(SExprNode* pProj) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId)) {
|
||||
if (QUERY_NODE_FUNCTION == nodeType(pProj) && !fmIsScalarFunc(((SFunctionNode*)pProj)->funcId) &&
|
||||
!fmIsPseudoColumnFunc(((SFunctionNode*)pProj)->funcId)) {
|
||||
return false;
|
||||
}
|
||||
return NULL == ((SExprNode*)pProj)->pAssociation;
|
||||
|
|
|
@ -1686,7 +1686,7 @@ static int32_t dnodeToVgroupsInfo(SArray* pDnodes, SVgroupsInfo** pVgsInfo) {
|
|||
static bool sysTableFromVnode(const char* pTable) {
|
||||
return (0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLES)) ||
|
||||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TABLE_DISTRIBUTED) ||
|
||||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
|
||||
(0 == strcmp(pTable, TSDB_INS_TABLE_USER_TAGS)));
|
||||
}
|
||||
|
||||
static bool sysTableFromDnode(const char* pTable) { return 0 == strcmp(pTable, TSDB_INS_TABLE_DNODE_VARIABLES); }
|
||||
|
@ -5968,6 +5968,7 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
|
|||
if (NULL == pReq->tagName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
|
||||
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt, true)) {
|
||||
|
@ -6051,6 +6052,7 @@ static int32_t buildDropColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt,
|
|||
if (NULL == pReq->colName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -6071,6 +6073,7 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt
|
|||
if (NULL == pReq->colName) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pReq->colId = pSchema->colId;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
typedef struct SLogicPlanContext {
|
||||
SPlanContext* pPlanCxt;
|
||||
SLogicNode* pCurrRoot;
|
||||
bool hasScan;
|
||||
} SLogicPlanContext;
|
||||
|
||||
typedef int32_t (*FCreateLogicNode)(SLogicPlanContext*, void*, SLogicNode**);
|
||||
|
@ -161,6 +162,10 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
return SCAN_TYPE_STREAM;
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
if (NULL == pScanCols) {
|
||||
return NULL == pScanPseudoCols
|
||||
? SCAN_TYPE_TABLE
|
||||
|
@ -169,17 +174,6 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
|
|||
: SCAN_TYPE_TABLE);
|
||||
}
|
||||
|
||||
if (TSDB_SYSTEM_TABLE == tableType) {
|
||||
return SCAN_TYPE_SYSTEM_TABLE;
|
||||
}
|
||||
|
||||
SNode* pCol = NULL;
|
||||
FOREACH(pCol, pScanCols) {
|
||||
if (COLUMN_TYPE_COLUMN == ((SColumnNode*)pCol)->colType) {
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
}
|
||||
|
||||
return SCAN_TYPE_TABLE;
|
||||
}
|
||||
|
||||
|
@ -300,6 +294,8 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
|
|||
nodesDestroyNode((SNode*)pScan);
|
||||
}
|
||||
|
||||
pCxt->hasScan = true;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1339,9 +1335,9 @@ static void doSetLogicNodeParent(SLogicNode* pNode, SLogicNode* pParent) {
|
|||
|
||||
static void setLogicNodeParent(SLogicNode* pNode) { doSetLogicNodeParent(pNode, NULL); }
|
||||
|
||||
static void setLogicSubplanType(SLogicSubplan* pSubplan) {
|
||||
static void setLogicSubplanType(bool hasScan, SLogicSubplan* pSubplan) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_VNODE_MODIFY != nodeType(pSubplan->pNode)) {
|
||||
pSubplan->subplanType = SUBPLAN_TYPE_SCAN;
|
||||
pSubplan->subplanType = hasScan ? SUBPLAN_TYPE_SCAN : SUBPLAN_TYPE_MERGE;
|
||||
} else {
|
||||
SVnodeModifyLogicNode* pModify = (SVnodeModifyLogicNode*)pSubplan->pNode;
|
||||
pSubplan->subplanType = (MODIFY_TABLE_TYPE_INSERT == pModify->modifyType && NULL != pModify->node.pChildren)
|
||||
|
@ -1351,7 +1347,7 @@ static void setLogicSubplanType(SLogicSubplan* pSubplan) {
|
|||
}
|
||||
|
||||
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
|
||||
SLogicPlanContext cxt = {.pPlanCxt = pCxt};
|
||||
SLogicPlanContext cxt = {.pPlanCxt = pCxt, .pCurrRoot = NULL, .hasScan = false};
|
||||
|
||||
SLogicSubplan* pSubplan = (SLogicSubplan*)nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN);
|
||||
if (NULL == pSubplan) {
|
||||
|
@ -1364,7 +1360,7 @@ int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan) {
|
|||
int32_t code = createQueryLogicNode(&cxt, pCxt->pAstRoot, &pSubplan->pNode);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
setLogicNodeParent(pSubplan->pNode);
|
||||
setLogicSubplanType(pSubplan);
|
||||
setLogicSubplanType(cxt.hasScan, pSubplan);
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
|
@ -35,7 +35,8 @@ typedef struct SPhysiPlanContext {
|
|||
int32_t errCode;
|
||||
int16_t nextDataBlockId;
|
||||
SArray* pLocationHelper;
|
||||
SArray* pExecNodeList; // SArray<SQueryNodeLoad>
|
||||
bool hasScan;
|
||||
bool hasSysScan;
|
||||
} SPhysiPlanContext;
|
||||
|
||||
static int32_t getSlotKey(SNode* pNode, const char* pStmtName, char* pKey) {
|
||||
|
@ -255,7 +256,7 @@ static int32_t addDataBlockSlot(SPhysiPlanContext* pCxt, SNode** pNode, SDataBlo
|
|||
|
||||
static int32_t addDataBlockSlotsForProject(SPhysiPlanContext* pCxt, const char* pStmtName, SNodeList* pList,
|
||||
SDataBlockDescNode* pDataBlockDesc) {
|
||||
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, true, false);
|
||||
return addDataBlockSlotsImpl(pCxt, pList, pDataBlockDesc, pStmtName, false, false);
|
||||
}
|
||||
|
||||
static int32_t pushdownDataBlockSlots(SPhysiPlanContext* pCxt, SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
|
||||
|
@ -495,8 +496,6 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
|
||||
SQueryNodeLoad node = {.addr = pSubplan->execNode, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -577,8 +576,6 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
pSubplan->execNode.nodeId = MNODE_HANDLE;
|
||||
pSubplan->execNode.epSet = pCxt->pPlanCxt->mgmtEpSet;
|
||||
}
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pCxt->pExecNodeList, &node);
|
||||
if (0 == strcmp(pScanLogicNode->tableName.tname, TSDB_INS_TABLE_DNODE_VARIABLES)) {
|
||||
pScan->mgmtEpSet = pScanLogicNode->pVgroupList->vgroups->epSet;
|
||||
} else {
|
||||
|
@ -586,6 +583,7 @@ static int32_t createSystemTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
}
|
||||
tNameGetFullDbName(&pScanLogicNode->tableName, pSubplan->dbFName);
|
||||
|
||||
pCxt->hasSysScan = true;
|
||||
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
|
||||
}
|
||||
|
||||
|
@ -601,6 +599,7 @@ static int32_t createTableMergeScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan*
|
|||
|
||||
static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
|
||||
SPhysiNode** pPhyNode) {
|
||||
pCxt->hasScan = true;
|
||||
switch (pScanLogicNode->scanType) {
|
||||
case SCAN_TYPE_TAG:
|
||||
case SCAN_TYPE_BLOCK_INFO:
|
||||
|
@ -1806,23 +1805,31 @@ static void setExplainInfo(SPlanContext* pCxt, SQueryPlan* pPlan) {
|
|||
}
|
||||
}
|
||||
|
||||
static void setExecNodeList(SPhysiPlanContext* pCxt, SArray* pExecNodeList) {
|
||||
if (NULL == pExecNodeList) {
|
||||
return;
|
||||
}
|
||||
if (pCxt->hasSysScan || !pCxt->hasScan) {
|
||||
SQueryNodeLoad node = {.addr = {.nodeId = MNODE_HANDLE, .epSet = pCxt->pPlanCxt->mgmtEpSet}, .load = 0};
|
||||
taosArrayPush(pExecNodeList, &node);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList) {
|
||||
SPhysiPlanContext cxt = {.pPlanCxt = pCxt,
|
||||
.errCode = TSDB_CODE_SUCCESS,
|
||||
.nextDataBlockId = 0,
|
||||
.pLocationHelper = taosArrayInit(32, POINTER_BYTES),
|
||||
.pExecNodeList = pExecNodeList};
|
||||
.hasScan = false,
|
||||
.hasSysScan = false};
|
||||
if (NULL == cxt.pLocationHelper) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (QUERY_POLICY_VNODE == tsQueryPolicy) {
|
||||
taosArrayClear(pExecNodeList);
|
||||
}
|
||||
|
||||
int32_t code = doCreatePhysiPlan(&cxt, pLogicPlan, pPlan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
setExplainInfo(pCxt, *pPlan);
|
||||
setExecNodeList(&cxt, pExecNodeList);
|
||||
}
|
||||
|
||||
destoryPhysiPlanContext(&cxt);
|
||||
|
|
|
@ -1062,8 +1062,9 @@ int32_t toISO8601Function(SScalarParam *pInput, int32_t inputNum, SScalarParam *
|
|||
memmove(fraction, fraction + TSDB_TIME_PRECISION_SEC_DIGITS, TSDB_TIME_PRECISION_SEC_DIGITS);
|
||||
}
|
||||
|
||||
struct tm *tmInfo = taosLocalTime((const time_t *)&timeVal, NULL);
|
||||
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", tmInfo);
|
||||
struct tm tmInfo;
|
||||
taosLocalTime((const time_t *)&timeVal, &tmInfo);
|
||||
strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%S", &tmInfo);
|
||||
int32_t len = (int32_t)strlen(buf);
|
||||
|
||||
//add timezone string
|
||||
|
@ -2601,3 +2602,263 @@ int32_t stateDurationScalarFunction(SScalarParam *pInput, int32_t inputNum, SSca
|
|||
pOutput->numOfRows = pInput->numOfRows;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
typedef enum { UNKNOWN_BIN = 0, USER_INPUT_BIN, LINEAR_BIN, LOG_BIN } EHistoBinType;
|
||||
|
||||
static int8_t getHistogramBinType(char* binTypeStr) {
|
||||
int8_t binType;
|
||||
if (strcasecmp(binTypeStr, "user_input") == 0) {
|
||||
binType = USER_INPUT_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "linear_bin") == 0) {
|
||||
binType = LINEAR_BIN;
|
||||
} else if (strcasecmp(binTypeStr, "log_bin") == 0) {
|
||||
binType = LOG_BIN;
|
||||
} else {
|
||||
binType = UNKNOWN_BIN;
|
||||
}
|
||||
|
||||
return binType;
|
||||
}
|
||||
|
||||
typedef struct SHistoFuncBin {
|
||||
double lower;
|
||||
double upper;
|
||||
int64_t count;
|
||||
double percentage;
|
||||
} SHistoFuncBin;
|
||||
|
||||
static bool getHistogramBinDesc(SHistoFuncBin** bins, int32_t* binNum, char* binDescStr, int8_t binType, bool normalized) {
|
||||
cJSON* binDesc = cJSON_Parse(binDescStr);
|
||||
int32_t numOfBins;
|
||||
double* intervals;
|
||||
if (cJSON_IsObject(binDesc)) { /* linaer/log bins */
|
||||
int32_t numOfParams = cJSON_GetArraySize(binDesc);
|
||||
int32_t startIndex;
|
||||
if (numOfParams != 4) {
|
||||
return false;
|
||||
}
|
||||
|
||||
cJSON* start = cJSON_GetObjectItem(binDesc, "start");
|
||||
cJSON* factor = cJSON_GetObjectItem(binDesc, "factor");
|
||||
cJSON* width = cJSON_GetObjectItem(binDesc, "width");
|
||||
cJSON* count = cJSON_GetObjectItem(binDesc, "count");
|
||||
cJSON* infinity = cJSON_GetObjectItem(binDesc, "infinity");
|
||||
|
||||
if (!cJSON_IsNumber(start) || !cJSON_IsNumber(count) || !cJSON_IsBool(infinity)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (count->valueint <= 0 || count->valueint > 1000) { // limit count to 1000
|
||||
return false;
|
||||
}
|
||||
|
||||
if (isinf(start->valuedouble) || (width != NULL && isinf(width->valuedouble)) ||
|
||||
(factor != NULL && isinf(factor->valuedouble)) || (count != NULL && isinf(count->valuedouble))) {
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t counter = (int32_t)count->valueint;
|
||||
if (infinity->valueint == false) {
|
||||
startIndex = 0;
|
||||
numOfBins = counter + 1;
|
||||
} else {
|
||||
startIndex = 1;
|
||||
numOfBins = counter + 3;
|
||||
}
|
||||
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
if (cJSON_IsNumber(width) && factor == NULL && binType == LINEAR_BIN) {
|
||||
// linear bin process
|
||||
if (width->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble + i * width->valuedouble;
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else if (cJSON_IsNumber(factor) && width == NULL && binType == LOG_BIN) {
|
||||
// log bin process
|
||||
if (start->valuedouble == 0) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (factor->valuedouble < 0 || factor->valuedouble == 0 || factor->valuedouble == 1) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
for (int i = 0; i < counter + 1; ++i) {
|
||||
intervals[startIndex] = start->valuedouble * pow(factor->valuedouble, i * 1.0);
|
||||
if (isinf(intervals[startIndex])) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
startIndex++;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (infinity->valueint == true) {
|
||||
intervals[0] = -INFINITY;
|
||||
intervals[numOfBins - 1] = INFINITY;
|
||||
// in case of desc bin orders, -inf/inf should be swapped
|
||||
ASSERT(numOfBins >= 4);
|
||||
if (intervals[1] > intervals[numOfBins - 2]) {
|
||||
TSWAP(intervals[0], intervals[numOfBins - 1]);
|
||||
}
|
||||
}
|
||||
} else if (cJSON_IsArray(binDesc)) { /* user input bins */
|
||||
if (binType != USER_INPUT_BIN) {
|
||||
return false;
|
||||
}
|
||||
numOfBins = cJSON_GetArraySize(binDesc);
|
||||
intervals = taosMemoryCalloc(numOfBins, sizeof(double));
|
||||
cJSON* bin = binDesc->child;
|
||||
if (bin == NULL) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
int i = 0;
|
||||
while (bin) {
|
||||
intervals[i] = bin->valuedouble;
|
||||
if (!cJSON_IsNumber(bin)) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
if (i != 0 && intervals[i] <= intervals[i - 1]) {
|
||||
taosMemoryFree(intervals);
|
||||
return false;
|
||||
}
|
||||
bin = bin->next;
|
||||
i++;
|
||||
}
|
||||
} else {
|
||||
return false;
|
||||
}
|
||||
|
||||
*binNum = numOfBins - 1;
|
||||
*bins = taosMemoryCalloc(numOfBins, sizeof(SHistoFuncBin));
|
||||
for (int32_t i = 0; i < *binNum; ++i) {
|
||||
(*bins)[i].lower = intervals[i] < intervals[i + 1] ? intervals[i] : intervals[i + 1];
|
||||
(*bins)[i].upper = intervals[i + 1] > intervals[i] ? intervals[i + 1] : intervals[i];
|
||||
(*bins)[i].count = 0;
|
||||
}
|
||||
|
||||
taosMemoryFree(intervals);
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t histogramScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
SHistoFuncBin *bins;
|
||||
int32_t numOfBins = 0;
|
||||
int32_t totalCount = 0;
|
||||
|
||||
int8_t binType = getHistogramBinType(varDataVal(pInput[1].columnData->pData));
|
||||
char* binDesc = varDataVal(pInput[2].columnData->pData);
|
||||
int64_t normalized = *(int64_t *)(pInput[3].columnData->pData);
|
||||
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
if (!getHistogramBinDesc(&bins, &numOfBins, binDesc, binType, (bool)normalized)) {
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pInputData, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputData, i);
|
||||
double v;
|
||||
GET_TYPED_DATA(v, double, type, data);
|
||||
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
if (v > bins[k].lower && v <= bins[k].upper) {
|
||||
bins[k].count++;
|
||||
totalCount++;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (normalized) {
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
if (totalCount != 0) {
|
||||
bins[k].percentage = bins[k].count / (double)totalCount;
|
||||
} else {
|
||||
bins[k].percentage = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < numOfBins; ++k) {
|
||||
int32_t len;
|
||||
char buf[512] = {0};
|
||||
if (!normalized) {
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%" PRId64 "}",
|
||||
bins[k].lower, bins[k].upper, bins[k].count);
|
||||
} else {
|
||||
len = sprintf(varDataVal(buf), "{\"lower_bin\":%g, \"upper_bin\":%g, \"count\":%lf}",
|
||||
bins[k].lower, bins[k].upper, bins[k].percentage);
|
||||
}
|
||||
varDataSetLen(buf, len);
|
||||
colDataAppend(pOutputData, k, buf, false);
|
||||
}
|
||||
|
||||
taosMemoryFree(bins);
|
||||
pOutput->numOfRows = numOfBins;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t selectScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
SColumnInfoData *pInputData = pInput->columnData;
|
||||
SColumnInfoData *pOutputData = pOutput->columnData;
|
||||
|
||||
int32_t type = GET_PARAM_TYPE(pInput);
|
||||
|
||||
for (int32_t i = 0; i < pInput->numOfRows; ++i) {
|
||||
if (colDataIsNull_s(pInputData, i)) {
|
||||
colDataAppendNULL(pOutputData, 0);
|
||||
continue;
|
||||
}
|
||||
|
||||
char* data = colDataGetData(pInputData, i);
|
||||
colDataAppend(pOutputData, i, data, false);
|
||||
}
|
||||
|
||||
|
||||
pOutput->numOfRows = 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t topBotScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
||||
int32_t firstLastScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
||||
int32_t sampleScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
||||
int32_t tailScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
||||
int32_t uniqueScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
||||
int32_t modeScalarFunction(SScalarParam *pInput, int32_t inputNum, SScalarParam *pOutput) {
|
||||
return selectScalarFunction(pInput, inputNum, pOutput);
|
||||
}
|
||||
|
|
|
@ -143,76 +143,80 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) {
|
|||
}
|
||||
|
||||
static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||
int32_t cnt = 0;
|
||||
void* data = NULL;
|
||||
while (1) {
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
qDebug("stream exec over, queue empty");
|
||||
break;
|
||||
}
|
||||
if (data == NULL) {
|
||||
data = qItem;
|
||||
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||
}
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
} else {
|
||||
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
int32_t cnt = 0;
|
||||
void* data = NULL;
|
||||
while (1) {
|
||||
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||
if (qItem == NULL) {
|
||||
qDebug("stream exec over, queue empty");
|
||||
break;
|
||||
} else {
|
||||
cnt++;
|
||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||
}
|
||||
if (data == NULL) {
|
||||
data = qItem;
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||
taosFreeQitem(qItem);
|
||||
if (qItem->type == STREAM_INPUT__DATA_BLOCK) {
|
||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
if (streamAppendQueueItem(data, qItem) < 0) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
break;
|
||||
} else {
|
||||
cnt++;
|
||||
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
|
||||
streamQueueProcessSuccess(pTask->inputQueue);
|
||||
taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks);
|
||||
taosFreeQitem(qItem);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||
if (data) streamFreeQitem(data);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (data == NULL) return pRes;
|
||||
|
||||
if (pTask->execType == TASK_EXEC__NONE) {
|
||||
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
||||
streamTaskOutput(pTask, data);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
qDebug("stream task %d exec end", pTask->taskId);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
taosArrayDestroy(pRes);
|
||||
return NULL;
|
||||
}
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||
if (data) streamFreeQitem(data);
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return NULL;
|
||||
}
|
||||
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||
qRes->childId = pTask->selfChildId;
|
||||
qRes->sourceVer = pSubmit->ver;
|
||||
}
|
||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
}
|
||||
|
||||
streamFreeQitem(data);
|
||||
if (data == NULL) break;
|
||||
|
||||
if (pTask->execType == TASK_EXEC__NONE) {
|
||||
ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK);
|
||||
streamTaskOutput(pTask, data);
|
||||
return pRes;
|
||||
}
|
||||
|
||||
qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt);
|
||||
streamTaskExecImpl(pTask, data, pRes);
|
||||
qDebug("stream task %d exec end", pTask->taskId);
|
||||
|
||||
if (taosArrayGetSize(pRes) != 0) {
|
||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||
if (qRes == NULL) {
|
||||
streamQueueProcessFail(pTask->inputQueue);
|
||||
taosArrayDestroy(pRes);
|
||||
return NULL;
|
||||
}
|
||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||
qRes->blocks = pRes;
|
||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(qRes);
|
||||
return NULL;
|
||||
}
|
||||
if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data;
|
||||
qRes->childId = pTask->selfChildId;
|
||||
qRes->sourceVer = pSubmit->ver;
|
||||
}
|
||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||
pRes = taosArrayInit(0, sizeof(SSDataBlock));
|
||||
}
|
||||
|
||||
streamFreeQitem(data);
|
||||
}
|
||||
return pRes;
|
||||
}
|
||||
|
||||
|
|
|
@ -192,6 +192,7 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms);
|
|||
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode);
|
||||
|
||||
// utils --------------
|
||||
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
|
||||
|
|
|
@ -1298,6 +1298,12 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeRestartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
syncNodeStopHeartbeatTimer(pSyncNode);
|
||||
syncNodeStartHeartbeatTimer(pSyncNode);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// utils --------------
|
||||
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
|
||||
SEpSet epSet;
|
||||
|
|
|
@ -103,10 +103,9 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
|
|||
// if pgno == 0 fetch new btree root leaf page
|
||||
if (pgno == 0) {
|
||||
// fetch page & insert into main db
|
||||
// allocate a new child page
|
||||
SPage *pPage;
|
||||
TXN txn;
|
||||
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, 0);
|
||||
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
|
||||
pPager->inTran = 1;
|
||||
|
||||
|
@ -118,8 +117,6 @@ int tdbBtreeOpen(int keyLen, int valLen, SPager *pPager, char const *tbname, SPg
|
|||
return -1;
|
||||
}
|
||||
|
||||
// TODO: Need to zero the page
|
||||
|
||||
ret = tdbPagerWrite(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
|
|
|
@ -473,12 +473,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
TXN txn;
|
||||
tdbTxnOpen(&txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
SBtreeInitPageArg iArg;
|
||||
iArg.pBt = pBt;
|
||||
iArg.flags = 0;
|
||||
|
||||
for (int pgIndex = 0; pgIndex < journalSize; ++pgIndex) {
|
||||
// read pgno & the page from journal
|
||||
SPgno pgno;
|
||||
|
@ -494,20 +488,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
ret = tdbPagerFetchPage(pPager, &pgno, &pPage, tdbBtreeInitPage, &iArg, &txn);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// write the page to db
|
||||
ret = tdbPagerWritePageToDB(pPager, pPage);
|
||||
if (ret < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdbPCacheRelease(pPager->pCache, pPage, &txn);
|
||||
*/
|
||||
i64 offset = pPager->pageSize * (pgno - 1);
|
||||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||
ASSERT(0);
|
||||
|
@ -523,8 +503,6 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
|
||||
tdbOsFSync(pPager->fd);
|
||||
|
||||
tdbTxnClose(&txn);
|
||||
|
||||
tdbOsFree(pageBuf);
|
||||
|
||||
tdbOsClose(jfd);
|
||||
|
|
|
@ -483,6 +483,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
|||
pRead->pHead->head.version, ver);
|
||||
pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -491,6 +492,7 @@ int32_t walReadVer(SWalReader *pRead, int64_t ver) {
|
|||
wError("vgId:%d, unexpected wal log index:%" PRId64 ", since body checksum not passed", pRead->pWal->cfg.vgId, ver);
|
||||
pRead->curInvalid = 1;
|
||||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
pRead->curVersion++;
|
||||
|
|
|
@ -357,14 +357,88 @@ FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
|
|||
|
||||
time_t taosTime(time_t *t) { return time(t); }
|
||||
|
||||
time_t taosMktime(struct tm *timep) { return mktime(timep); }
|
||||
time_t taosMktime(struct tm *timep) {
|
||||
#ifdef WINDOWS
|
||||
struct tm tm1 = {0};
|
||||
LARGE_INTEGER t;
|
||||
FILETIME f;
|
||||
SYSTEMTIME s;
|
||||
FILETIME ff;
|
||||
SYSTEMTIME ss;
|
||||
LARGE_INTEGER offset;
|
||||
|
||||
time_t tt = 0;
|
||||
localtime_s(&tm1, &tt);
|
||||
ss.wYear = tm1.tm_year + 1900;
|
||||
ss.wMonth = tm1.tm_mon + 1;
|
||||
ss.wDay = tm1.tm_wday;
|
||||
ss.wHour = tm1.tm_hour;
|
||||
ss.wMinute = tm1.tm_min;
|
||||
ss.wSecond = tm1.tm_sec;
|
||||
ss.wMilliseconds = 0;
|
||||
SystemTimeToFileTime(&ss, &ff);
|
||||
offset.QuadPart = ff.dwHighDateTime;
|
||||
offset.QuadPart <<= 32;
|
||||
offset.QuadPart |= ff.dwLowDateTime;
|
||||
|
||||
s.wYear = timep->tm_year + 1900;
|
||||
s.wMonth = timep->tm_mon + 1;
|
||||
s.wDay = timep->tm_wday;
|
||||
s.wHour = timep->tm_hour;
|
||||
s.wMinute = timep->tm_min;
|
||||
s.wSecond = timep->tm_sec;
|
||||
s.wMilliseconds = 0;
|
||||
SystemTimeToFileTime(&s, &f);
|
||||
t.QuadPart = f.dwHighDateTime;
|
||||
t.QuadPart <<= 32;
|
||||
t.QuadPart |= f.dwLowDateTime;
|
||||
|
||||
t.QuadPart -= offset.QuadPart;
|
||||
return (time_t)(t.QuadPart / 10000000);
|
||||
#else
|
||||
return mktime(timep);
|
||||
#endif
|
||||
}
|
||||
|
||||
struct tm *taosLocalTime(const time_t *timep, struct tm *result) {
|
||||
if (result == NULL) {
|
||||
return localtime(timep);
|
||||
}
|
||||
#ifdef WINDOWS
|
||||
localtime_s(result, timep);
|
||||
if (*timep < 0) {
|
||||
SYSTEMTIME ss,s;
|
||||
FILETIME ff,f;
|
||||
LARGE_INTEGER offset;
|
||||
struct tm tm1;
|
||||
time_t tt = 0;
|
||||
localtime_s(&tm1, &tt);
|
||||
ss.wYear = tm1.tm_year + 1900;
|
||||
ss.wMonth = tm1.tm_mon + 1;
|
||||
ss.wDay = tm1.tm_mday;
|
||||
ss.wHour = tm1.tm_hour;
|
||||
ss.wMinute = tm1.tm_min;
|
||||
ss.wSecond = tm1.tm_sec;
|
||||
ss.wMilliseconds = 0;
|
||||
SystemTimeToFileTime(&ss, &ff);
|
||||
offset.QuadPart = ff.dwHighDateTime;
|
||||
offset.QuadPart <<= 32;
|
||||
offset.QuadPart |= ff.dwLowDateTime;
|
||||
offset.QuadPart += *timep * 10000000;
|
||||
f.dwLowDateTime = offset.QuadPart & 0xffffffff;
|
||||
f.dwHighDateTime = (offset.QuadPart >> 32) & 0xffffffff;
|
||||
FileTimeToSystemTime(&f, &s);
|
||||
result->tm_sec = s.wSecond;
|
||||
result->tm_min = s.wMinute;
|
||||
result->tm_hour = s.wHour;
|
||||
result->tm_mday = s.wDay;
|
||||
result->tm_mon = s.wMonth-1;
|
||||
result->tm_year = s.wYear-1900;
|
||||
result->tm_wday = s.wDayOfWeek;
|
||||
result->tm_yday = 0;
|
||||
result->tm_isdst = 0;
|
||||
} else {
|
||||
localtime_s(result, timep);
|
||||
}
|
||||
#else
|
||||
localtime_r(timep, result);
|
||||
#endif
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -5,103 +5,250 @@ import time
|
|||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.snapshot = 0
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 1
|
||||
self.rowsPerTbl = 10000
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'db2',
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 100000,
|
||||
'batchNum': 1200,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 10,
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
topicNameList = ['topic1']
|
||||
expectRowsList = []
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1)
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema'])
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.asyncInsertData(paraDict)
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
# queryString = "select ts, sin(c1), pow(c2,3) from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName'])
|
||||
queryString = "select * from %s.%s where t2 == 'beijing' or t2 == 'changsha'" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
# tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 100000,
|
||||
'batchNum': 3000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
# update to half tables
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
# tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix="ctbx",
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
# queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
|
||||
queryString = "select ts, c1, c2, t4 from %s.%s where t4 == 'shanghai' or t4 == 'changsha'"%(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# start tmq consume processor
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||
topicList = topicNameList[0]
|
||||
ifcheckdata = 0
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 0
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:2000, auto.offset.reset:earliest'
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
# tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim()
|
||||
tdLog.info("create some new ctb")
|
||||
paraDict['ctbStartIdx'] = paraDict['ctbStartIdx'] + paraDict['ctbNum']
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], ctbNum=paraDict['ctbNum'], ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data into new ctb")
|
||||
pThread = tmqCom.asyncInsertData(paraDict)
|
||||
|
||||
pThread.join()
|
||||
tdLog.info("wait insert end")
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
|
||||
if expectRowsList[0] != resultList[0]:
|
||||
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
time.sleep(10)
|
||||
for i in range(len(topicNameList)):
|
||||
tdSql.query("drop topic %s"%topicNameList[i])
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("run select sql from db")
|
||||
tdSql.query(queryString)
|
||||
expectrowcnt = tdSql.getRows()
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 5000,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['snapshot'] = self.snapshot
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
|
||||
# update to half tables
|
||||
paraDict['startTs'] = paraDict['startTs'] + int(self.rowsPerTbl / 2)
|
||||
paraDict['rowsPerTbl'] = int(self.rowsPerTbl / 2)
|
||||
tmqCom.insert_data_with_autoCreateTbl(tsql=tdSql,dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
# tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
# ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
# startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
queryString = "select ts, c1, c2 from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicFromStb1, queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
|
||||
# paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
consumerId = 1
|
||||
expectrowcnt = int(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2)
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdSql.query(queryString)
|
||||
totalRowsInserted = tdSql.getRows()
|
||||
|
||||
tdLog.info("act consume rows: %d, act insert rows: %d, expect consume rows: %d, "%(totalConsumeRows, totalRowsInserted, expectrowcnt))
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
# tmqCom.checkFileContent(consumerId, queryString)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.prepareTestEnv()
|
||||
tdLog.printNoPrefix("=============================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 0: only consume from wal")
|
||||
self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
self.prepareTestEnv()
|
||||
tdLog.printNoPrefix("====================================================================")
|
||||
tdLog.printNoPrefix("======== snapshot is 1: firstly consume from tsbs, and then from wal")
|
||||
self.snapshot = 1
|
||||
self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -126,7 +126,7 @@ python3 ./test.py -f 2-query/count_partition.py
|
|||
python3 ./test.py -f 2-query/function_null.py
|
||||
python3 ./test.py -f 2-query/queryQnode.py
|
||||
python3 ./test.py -f 2-query/max_partition.py
|
||||
|
||||
python3 ./test.py -f 2-query/last_row.py
|
||||
|
||||
python3 ./test.py -f 6-cluster/5dnode1mnode.py
|
||||
#BUG python3 ./test.py -f 6-cluster/5dnode2mnode.py -N 5 -M 3
|
||||
|
@ -274,6 +274,7 @@ python3 ./test.py -f 2-query/irate.py -Q 2
|
|||
python3 ./test.py -f 2-query/function_null.py -Q 2
|
||||
python3 ./test.py -f 2-query/count_partition.py -Q 2
|
||||
python3 ./test.py -f 2-query/max_partition.py -Q 2
|
||||
python3 ./test.py -f 2-query/last_row.py -Q 2
|
||||
|
||||
#------------querPolicy 3-----------
|
||||
|
||||
|
@ -361,3 +362,4 @@ python3 ./test.py -f 2-query/irate.py -Q 3
|
|||
python3 ./test.py -f 2-query/function_null.py -Q 3
|
||||
python3 ./test.py -f 2-query/count_partition.py -Q 3
|
||||
python3 ./test.py -f 2-query/max_partition.py -Q 3
|
||||
python3 ./test.py -f 2-query/last_row.py -Q 3
|
||||
|
|
|
@ -596,7 +596,8 @@ void printParaIntoFile() {
|
|||
g_fp = pFile;
|
||||
|
||||
time_t tTime = taosGetTimestampSec();
|
||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
||||
struct tm tm;
|
||||
taosLocalTime(&tTime, &tm);
|
||||
|
||||
taosFprintfFile(pFile, "###################################################################\n");
|
||||
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
|
||||
|
|
|
@ -171,7 +171,8 @@ static void printHelp() {
|
|||
|
||||
char* getCurrentTimeString(char* timeString) {
|
||||
time_t tTime = taosGetTimestampSec();
|
||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
||||
struct tm tm;
|
||||
taosLocalTime(&tTime, &tm);
|
||||
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour,
|
||||
tm.tm_min, tm.tm_sec);
|
||||
|
||||
|
@ -420,18 +421,6 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
|
|||
ms = val % 1000;
|
||||
}
|
||||
|
||||
/*
|
||||
comment out as it make testcases like select_with_tags.sim fail.
|
||||
but in windows, this may cause the call to localtime crash if tt < 0,
|
||||
need to find a better solution.
|
||||
if (tt < 0) {
|
||||
tt = 0;
|
||||
}
|
||||
*/
|
||||
|
||||
#ifdef WINDOWS
|
||||
if (tt < 0) tt = 0;
|
||||
#endif
|
||||
if (tt <= 0 && ms < 0) {
|
||||
tt--;
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
|
@ -443,8 +432,9 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
|
|||
}
|
||||
}
|
||||
|
||||
struct tm* ptm = taosLocalTime(&tt, NULL);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
struct tm ptm;
|
||||
taosLocalTime(&tt, &ptm);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", ms);
|
||||
|
|
|
@ -635,7 +635,7 @@ bool simCreateTaosdConnect(SScript *script, char *rest) {
|
|||
bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
|
||||
char timeStr[30] = {0};
|
||||
time_t tt;
|
||||
struct tm *tp;
|
||||
struct tm tp;
|
||||
SCmdLine *line = &script->lines[script->linePos];
|
||||
int32_t ret = -1;
|
||||
|
||||
|
@ -768,20 +768,9 @@ bool simExecuteNativeSqlCommand(SScript *script, char *rest, bool isSlow) {
|
|||
} else {
|
||||
tt = (*(int64_t *)row[i]) / 1000000000;
|
||||
}
|
||||
/* comment out as it make testcases like select_with_tags.sim fail.
|
||||
but in windows, this may cause the call to localtime crash if tt < 0,
|
||||
need to find a better solution.
|
||||
if (tt < 0) {
|
||||
tt = 0;
|
||||
}
|
||||
*/
|
||||
|
||||
#ifdef WINDOWS
|
||||
if (tt < 0) tt = 0;
|
||||
#endif
|
||||
|
||||
tp = taosLocalTime(&tt, NULL);
|
||||
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", tp);
|
||||
taosLocalTime(&tt, &tp);
|
||||
strftime(timeStr, 64, "%y-%m-%d %H:%M:%S", &tp);
|
||||
if (precision == TSDB_TIME_PRECISION_MILLI) {
|
||||
sprintf(value, "%s.%03d", timeStr, (int32_t)(*((int64_t *)row[i]) % 1000));
|
||||
} else if (precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
|
|
|
@ -175,7 +175,7 @@ SScript *simBuildScriptObj(char *fileName) {
|
|||
SScript *simParseScript(char *fileName) {
|
||||
TdFilePtr pFile;
|
||||
int32_t tokenLen, lineNum = 0;
|
||||
char *buffer = NULL, name[128], *token, *rest;
|
||||
char buffer[10*1024], name[128], *token, *rest;
|
||||
SCommand *pCmd;
|
||||
SScript *script;
|
||||
|
||||
|
@ -195,7 +195,7 @@ SScript *simParseScript(char *fileName) {
|
|||
simResetParser();
|
||||
|
||||
while (!taosEOFFile(pFile)) {
|
||||
if (taosGetLineFile(pFile, &buffer) == -1) continue;
|
||||
if (taosGetsFile(pFile, sizeof(buffer) - 1, buffer) == -1) continue;
|
||||
|
||||
lineNum++;
|
||||
int32_t cmdlen = (int32_t)strlen(buffer);
|
||||
|
@ -240,7 +240,6 @@ SScript *simParseScript(char *fileName) {
|
|||
return NULL;
|
||||
}
|
||||
}
|
||||
if(buffer != NULL) taosMemoryFree(buffer);
|
||||
taosCloseFile(&pFile);
|
||||
|
||||
script = simBuildScriptObj(fileName);
|
||||
|
|
|
@ -231,18 +231,6 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
|||
ms = val % 1000;
|
||||
}
|
||||
|
||||
/*
|
||||
comment out as it make testcases like select_with_tags.sim fail.
|
||||
but in windows, this may cause the call to localtime crash if tt < 0,
|
||||
need to find a better solution.
|
||||
if (tt < 0) {
|
||||
tt = 0;
|
||||
}
|
||||
*/
|
||||
|
||||
#ifdef WINDOWS
|
||||
if (tt < 0) tt = 0;
|
||||
#endif
|
||||
if (tt <= 0 && ms < 0) {
|
||||
tt--;
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
|
@ -254,8 +242,9 @@ char *shellFormatTimestamp(char *buf, int64_t val, int32_t precision) {
|
|||
}
|
||||
}
|
||||
|
||||
struct tm *ptm = taosLocalTime(&tt, NULL);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", ptm);
|
||||
struct tm ptm = {0};
|
||||
taosLocalTime(&tt, &ptm);
|
||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
|
||||
|
||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||
sprintf(buf + pos, ".%09d", ms);
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit b7b922268c4a06d9db77ffdfde0726f3d9900b72
|
||||
Subproject commit 0b8a3373bb7548f8106d13e7d3b0a988d3c4d48a
|
Loading…
Reference in New Issue