Merge branch 'develop' of github.com:taosdata/TDengine into dev/chr

This commit is contained in:
tomchon 2021-06-09 16:26:09 +08:00
commit 7de7e54c95
52 changed files with 2331 additions and 1470 deletions

View File

@ -123,7 +123,11 @@ IF (TD_LINUX)
MESSAGE(STATUS "set ningsi macro to true")
ENDIF ()
IF (TD_MEMORY_SANITIZER)
SET(DEBUG_FLAGS "-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -static-libasan -O0 -g3 -DDEBUG")
ELSE ()
SET(DEBUG_FLAGS "-O0 -g3 -DDEBUG")
ENDIF ()
SET(RELEASE_FLAGS "-O3 -Wno-error")
IF (${COVER} MATCHES "true")
@ -144,7 +148,11 @@ IF (TD_DARWIN_64)
ADD_DEFINITIONS(-DUSE_LIBICONV)
MESSAGE(STATUS "darwin64 is defined")
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
IF (TD_MEMORY_SANITIZER)
SET(DEBUG_FLAGS "-fsanitize=address -fsanitize=undefined -fno-sanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=null -fno-sanitize=alignment -O0 -g3 -DDEBUG")
ELSE ()
SET(DEBUG_FLAGS "-O0 -g3 -DDEBUG")
ENDIF ()
SET(RELEASE_FLAGS "-Og")
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc)
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/lz4/inc)
@ -162,7 +170,14 @@ IF (TD_WINDOWS)
IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
ENDIF ()
SET(DEBUG_FLAGS "/fsanitize=thread /fsanitize=leak /fsanitize=memory /fsanitize=undefined /fsanitize=hwaddress /Zi /W3 /GL")
IF (TD_MEMORY_SANITIZER)
MESSAGE("memory sanitizer detected as true")
SET(DEBUG_FLAGS "/fsanitize=address /Zi /W3 /GL")
ELSE ()
MESSAGE("memory sanitizer detected as false")
SET(DEBUG_FLAGS "/Zi /W3 /GL")
ENDIF ()
SET(RELEASE_FLAGS "/W0 /O3 /GL")
ENDIF ()

View File

@ -83,3 +83,8 @@ SET(TD_BUILD_JDBC TRUE)
IF (${BUILD_JDBC} MATCHES "false")
SET(TD_BUILD_JDBC FALSE)
ENDIF ()
SET(TD_MEMORY_SANITIZER FALSE)
IF (${MEMORY_SANITIZER} MATCHES "true")
SET(TD_MEMORY_SANITIZER TRUE)
ENDIF ()

View File

@ -176,7 +176,7 @@ TDengine 分布式架构的逻辑结构图如下:
**通讯方式:**TDengine系统的各个数据节点之间以及应用驱动与各数据节点之间的通讯是通过TCP/UDP进行的。因为考虑到物联网场景数据写入的包一般不大因此TDengine 除采用TCP做传输之外还采用UDP方式因为UDP 更加高效而且不受连接数的限制。TDengine实现了自己的超时、重传、确认等机制以确保UDP的可靠传输。对于数据量不到15K的数据包采取UDP的方式进行传输超过15K的或者是查询类的操作自动采取TCP的方式进行传输。同时TDengine根据配置和数据包会自动对数据进行压缩/解压缩,数字签名/认证等处理。对于数据节点之间的数据复制只采用TCP方式进行数据传输。
**FQDN配置**一个数据节点有一个或多个FQDN可以在系统配置文件taos.cfg通过参数“fqdn"进行指定如果没有指定系统将自动获取计算机的hostname作为其FQDN。如果节点没有配置FQDN可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP因为IP地址可变一旦变化将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN需要保证DNS服务正常工作或者在节点以及应用所在的节点配置好hosts文件。
**FQDN配置**一个数据节点有一个或多个FQDN可以在系统配置文件taos.cfg通过参数“fqdn"进行指定如果没有指定系统将自动获取计算机的hostname作为其FQDN。如果节点没有配置FQDN可以直接将该节点的配置参数fqdn设置为它的IP地址。但不建议使用IP因为IP地址可变一旦变化将让集群无法正常工作。一个数据节点的EP(End Point)由FQDN + Port组成。采用FQDN需要保证DNS服务正常工作或者在节点以及应用所在的节点配置好hosts文件。另外,这个参数值的长度需要控制在 96 个字符以内。
**端口配置:**一个数据节点对外的端口由TDengine的系统配置参数serverPort决定对集群内部通讯的端口是serverPort+5。集群内数据节点之间的数据复制操作还占有一个TCP端口是serverPort+10. 为支持多线程高效的处理UDP数据每个对内和对外的UDP连接都需要占用5个连续的端口。因此一个数据节点总的端口范围为serverPort到serverPort + 10总共11个TCP/UDP端口。另外还可能有 RESTful、Arbitrator 所使用的端口,那样的话就一共是 13 个。使用时需要确保防火墙将这些端口打开以备使用。每个数据节点可以配置不同的serverPort。详细的端口情况请参见 [TDengine 2.0 端口说明](https://www.taosdata.com/cn/documentation/faq#port)

View File

@ -325,10 +325,12 @@ for (int i = 0; i < numOfRows; i++){
}
s.setString(2, s2, 10);
// AddBatch 之后,可以再设定新的表名、TAGS、VALUES 取值,这样就能实现一次执行向多个数据表写入
// AddBatch 之后,缓存并未清空。为避免混乱,并不推荐在 ExecuteBatch 之前再次绑定新一批的数据
s.columnDataAddBatch();
// 执行语句:
// 执行绑定数据后的语句:
s.columnDataExecuteBatch();
// 执行语句后清空缓存。在清空之后,可以复用当前的对象,绑定新的一批数据(可以是新表名、新 TAGS 值、新 VALUES 值):
s.columnDataClearBatch();
// 执行完毕,释放资源:
s.columnDataCloseBatch();
```

View File

@ -307,6 +307,8 @@ TDengine的异步API均采用非阻塞调用模式。应用程序可以用多线
8. 调用 `taos_stmt_execute` 执行已经准备好的批处理指令;
9. 执行完毕,调用 `taos_stmt_close` 释放所有资源。
说明:如果 `taos_stmt_execute` 执行成功,假如不需要改变 SQL 语句的话,那么是可以复用 `taos_stmt_prepare` 的解析结果,直接进行第 36 步绑定新数据的。但如果执行出错,那么并不建议继续在当前的环境上下文下继续工作,而是建议释放资源,然后从 `taos_stmt_init` 步骤重新开始。
除 C/C++ 语言外TDengine 的 Java 语言 JNI Connector 也提供参数绑定接口支持,具体请另外参见:[参数绑定接口的 Java 用法](https://www.taosdata.com/cn/documentation/connector/java#stmt-java)。
接口相关的具体函数如下(也可以参考 [apitest.c](https://github.com/taosdata/TDengine/blob/develop/tests/examples/c/apitest.c) 文件中使用对应函数的方式):
@ -378,6 +380,11 @@ typedef struct TAOS_MULTI_BIND {
执行完毕,释放所有资源。
- `char * taos_stmt_errstr(TAOS_STMT *stmt)`
2.1.3.0 版本新增)
用于在其他 stmt API 返回错误(返回错误码或空指针)时获取错误信息。
### 连续查询接口
TDengine提供时间驱动的实时流式计算API。可以每隔一指定的时间段对一张或多张数据库的表(数据流)进行各种实时聚合计算操作。操作简单仅有打开、关闭流的API。具体如下

View File

@ -99,7 +99,7 @@ taosd -C
下面仅仅列出一些重要的配置参数,更多的参数请看配置文件里的说明。各个参数的详细介绍及作用请看前述章节,而且这些参数的缺省配置都是工作的,一般无需设置。**注意:配置修改后,需要重启*taosd*服务才能生效。**
- firstEp: taosd启动时主动连接的集群中首个dnode的end point, 默认值为localhost:6030。
- fqdn数据节点的FQDN缺省为操作系统配置的第一个hostname。如果习惯IP地址访问可设置为该节点的IP地址。
- fqdn数据节点的FQDN缺省为操作系统配置的第一个hostname。如果习惯IP地址访问可设置为该节点的IP地址。这个参数值的长度需要控制在 96 个字符以内。
- serverPorttaosd启动后对外服务的端口号默认值为6030。RESTful服务使用的端口号是在此基础上+11即默认值为6041。
- dataDir: 数据文件目录,所有的数据文件都将写入该目录。默认值:/var/lib/taos。
- logDir日志文件目录客户端和服务器的运行日志文件将写入该目录。默认值/var/log/taos。

View File

@ -138,6 +138,7 @@ bool isSimpleAggregateRv(SQueryInfo* pQueryInfo);
bool tscNonOrderedProjectionQueryOnSTable(SQueryInfo *pQueryInfo, int32_t tableIndex);
bool tscOrderedProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsDiffDerivQuery(SQueryInfo* pQueryInfo);
bool tscIsProjectionQueryOnSTable(SQueryInfo* pQueryInfo, int32_t tableIndex);
bool tscIsProjectionQuery(SQueryInfo* pQueryInfo);

View File

@ -266,6 +266,7 @@ typedef struct SSqlObj {
typedef struct SSqlStream {
SSqlObj *pSql;
void * cqhandle; // stream belong to SCQContext handle
const char* dstTable;
uint32_t streamId;
char listed;

View File

@ -468,6 +468,10 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int32_t cnt = 0;
int32_t j = 0;
if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) {
return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z);
}
for (uint32_t k = 1; k < sToken.n - 1; ++k) {
if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) {
tmpTokenBuf[j] = sToken.z[k + 1];
@ -711,7 +715,7 @@ static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char
}
code = TSDB_CODE_TSC_INVALID_OPERATION;
char tmpTokenBuf[16*1024] = {0}; // used for deleting Escape character: \\, \', \"
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
int32_t numOfRows = 0;
code = tsParseValues(str, dataBuf, maxNumOfRows, pInsertParam, &numOfRows, tmpTokenBuf);

View File

@ -19,6 +19,7 @@
#include "ttimer.h"
#include "tutil.h"
#include "taosmsg.h"
#include "tcq.h"
#include "taos.h"
@ -294,24 +295,34 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
return msgLen;
}
void tscKillConnection(STscObj *pObj) {
pthread_mutex_lock(&pObj->mutex);
SSqlObj *pSql = pObj->sqlList;
while (pSql) {
pSql = pSql->next;
// cqContext->dbconn is killed then call this callback
void cqConnKilledNotify(void* handle, void* conn) {
if (handle == NULL || conn == NULL){
return ;
}
SCqContext* pContext = (SCqContext*) handle;
if (pContext->dbConn == conn){
atomic_store_ptr(&(pContext->dbConn), NULL);
}
}
void tscKillConnection(STscObj *pObj) {
// get stream header by locked
pthread_mutex_lock(&pObj->mutex);
SSqlStream *pStream = pObj->streamList;
pthread_mutex_unlock(&pObj->mutex);
while (pStream) {
SSqlStream *tmp = pStream->next;
// set associate variant to NULL
cqConnKilledNotify(pStream->cqhandle, pObj);
// taos_close_stream function call pObj->mutet lock , careful death-lock
taos_close_stream(pStream);
pStream = tmp;
}
pthread_mutex_unlock(&pObj->mutex);
tscDebug("connection:%p is killed", pObj);
taos_close(pObj);
}

View File

@ -122,7 +122,7 @@ static int32_t getColumnIndexByName(SSqlCmd* pCmd, const SStrToken* pToken, SQue
static int32_t getTableIndexByName(SStrToken* pToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t getTableIndexImpl(SStrToken* pTableToken, SQueryInfo* pQueryInfo, SColumnIndex* pIndex);
static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo);
static int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* msg);
static int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
static int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate);
@ -438,7 +438,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
} else if (pInfo->type == TSDB_SQL_DROP_DNODE) {
if (pzName->type == TK_STRING) {
pzName->n = strdequote(pzName->z);
}
strncpy(pCmd->payload, pzName->z, pzName->n);
} else { // drop user/account
if (pzName->n >= TSDB_USER_LEN) {
@ -516,7 +518,9 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
SStrToken* id = taosArrayGet(pInfo->pMiscInfo->a, 0);
if (id->type == TK_STRING) {
id->n = strdequote(id->z);
}
break;
}
@ -2157,7 +2161,10 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
const char* msg6 = "function applied to tags not allowed";
const char* msg7 = "normal table can not apply this function";
const char* msg8 = "multi-columns selection does not support alias column name";
const char* msg9 = "diff can no be applied to unsigned numeric type";
const char* msg9 = "diff/derivative can no be applied to unsigned numeric type";
const char* msg10 = "derivative duration should be greater than 1 Second";
const char* msg11 = "third parameter in derivative should be 0 or 1";
const char* msg12 = "parameter is out of range [1, 100]";
switch (functionId) {
case TSDB_FUNC_COUNT: {
@ -2309,7 +2316,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), resultSize, false);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, functionId, &index, resultType, resultSize, getNewResColId(pCmd), intermediateResSize, false);
if (functionId == TSDB_FUNC_LEASTSQR) { // set the leastsquares parameters
char val[8] = {0};
@ -2340,12 +2347,22 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
tickPerSec /= 1000;
}
if (tickPerSec <= 0 || tickPerSec < TSDB_TICK_PER_SECOND(info.precision)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg10);
}
tscExprAddParams(&pExpr->base, (char*) &tickPerSec, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
memset(val, 0, tListLen(val));
if (tVariantDump(&pParamElem[2].pNode->value, val, TSDB_DATA_TYPE_BIGINT, true) < 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
int64_t v = *(int64_t*) val;
if (v != 0 && v != 1) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg11);
}
tscExprAddParams(&pExpr->base, val, TSDB_DATA_TYPE_BIGINT, LONG_BYTES);
}
@ -2551,7 +2568,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
int64_t nTop = GET_INT32_VAL(val);
if (nTop <= 0 || nTop > 100) { // todo use macro
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg12);
}
// todo REFACTOR
@ -5645,7 +5662,7 @@ int32_t validateSqlFunctionInStreamSql(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
bool isProjectionFunction = false;
const char* msg1 = "column projection is not compatible with interval";
const char* msg1 = "functions not compatible with interval";
// multi-output set/ todo refactor
size_t size = taosArrayGetSize(pQueryInfo->exprList);
@ -5669,8 +5686,8 @@ int32_t validateFunctionsInIntervalOrGroupbyQuery(SSqlCmd* pCmd, SQueryInfo* pQu
}
}
if ((pExpr->base.functionId == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) || pExpr->base.functionId == TSDB_FUNC_DIFF ||
pExpr->base.functionId == TSDB_FUNC_ARITHM) {
int32_t f = pExpr->base.functionId;
if ((f == TSDB_FUNC_PRJ && pExpr->base.numOfParams == 0) || f == TSDB_FUNC_DIFF || f == TSDB_FUNC_ARITHM || f == TSDB_FUNC_DERIVATIVE) {
isProjectionFunction = true;
}
}
@ -6266,7 +6283,7 @@ static void updateTagPrjFunction(SQueryInfo* pQueryInfo) {
* 2. if selectivity function and tagprj function both exist, there should be only
* one selectivity function exists.
*/
static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd) {
static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "only one selectivity function allowed in presence of tags function";
const char* msg3 = "aggregation function should not be mixed up with projection";
@ -6289,10 +6306,11 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
int16_t functionId = pExpr->base.functionId;
if (functionId == TSDB_FUNC_TAGPRJ || functionId == TSDB_FUNC_PRJ || functionId == TSDB_FUNC_TS ||
functionId == TSDB_FUNC_ARITHM) {
functionId == TSDB_FUNC_ARITHM || functionId == TSDB_FUNC_TS_DUMMY) {
continue;
}
if ((aAggs[functionId].status & TSDB_FUNCSTATE_SELECTIVITY) != 0) {
numOfSelectivity++;
} else {
@ -6304,7 +6322,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
// When the tag projection function on tag column that is not in the group by clause, aggregation function and
// selectivity function exist in select clause is not allowed.
if (numOfAggregation > 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(msg, msg1);
}
/*
@ -6333,7 +6351,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
(functionId == TSDB_FUNC_LAST_DST && (pExpr->base.colInfo.flag & TSDB_COL_NULL) != 0)) {
// do nothing
} else {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(msg, msg1);
}
}
@ -6346,7 +6364,7 @@ static int32_t checkUpdateTagPrjFunctions(SQueryInfo* pQueryInfo, SSqlCmd* pCmd)
} else {
if ((pQueryInfo->type & TSDB_QUERY_TYPE_PROJECTION_QUERY) != 0) {
if (numOfAggregation > 0 && pQueryInfo->groupbyExpr.numOfGroupCols == 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(msg, msg3);
}
if (numOfAggregation > 0 || numOfSelectivity > 0) {
@ -6394,9 +6412,14 @@ static int32_t doAddGroupbyColumnsOnDemand(SSqlCmd* pCmd, SQueryInfo* pQueryInfo
size_t size = tscNumOfExprs(pQueryInfo);
if (TSDB_COL_IS_TAG(pColIndex->flag)) {
int32_t f = TSDB_FUNC_TAG;
if (tscIsDiffDerivQuery(pQueryInfo)) {
f = TSDB_FUNC_TAGPRJ;
}
SColumnIndex index = {.tableIndex = pQueryInfo->groupbyExpr.tableIndex, .columnIndex = colIndex};
SExprInfo* pExpr = tscExprAppend(pQueryInfo, TSDB_FUNC_TAG, &index, s->type, s->bytes,
getNewResColId(pCmd), s->bytes, true);
SExprInfo* pExpr = tscExprAppend(pQueryInfo, f, &index, s->type, s->bytes, getNewResColId(pCmd), s->bytes, true);
memset(pExpr->base.aliasName, 0, sizeof(pExpr->base.aliasName));
tstrncpy(pExpr->base.aliasName, s->name, sizeof(pExpr->base.aliasName));
@ -6456,7 +6479,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
return (tableCounting && tagProjection)? -1:0;
}
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, char* msg) {
const char* msg1 = "functions/columns not allowed in group by query";
const char* msg2 = "projection query on columns not allowed";
const char* msg3 = "group by/session/state_window not allowed on projection query";
@ -6466,17 +6489,17 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
// only retrieve tags, group by is not supportted
if (tscQueryTags(pQueryInfo)) {
if (doTagFunctionCheck(pQueryInfo) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
return invalidOperationMsg(msg, msg5);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0 || isTimeWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
return invalidOperationMsg(msg, msg4);
} else {
return TSDB_CODE_SUCCESS;
}
}
if (tscIsProjectionQuery(pQueryInfo) && tscIsSessionWindowQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
return invalidOperationMsg(msg, msg3);
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
@ -6484,6 +6507,7 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
if (onlyTagPrjFunction(pQueryInfo) && allTagPrjInGroupby(pQueryInfo)) {
// It is a groupby aggregate query, the tag project function is not suitable for this case.
updateTagPrjFunction(pQueryInfo);
return doAddGroupbyColumnsOnDemand(pCmd, pQueryInfo);
}
@ -6508,21 +6532,21 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
if (!qualified) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
return invalidOperationMsg(msg, msg2);
}
}
if (IS_MULTIOUTPUT(aAggs[functId].status) && functId != TSDB_FUNC_TOP && functId != TSDB_FUNC_BOTTOM &&
functId != TSDB_FUNC_DIFF && functId != TSDB_FUNC_TAGPRJ && functId != TSDB_FUNC_PRJ) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(msg, msg1);
}
if (functId == TSDB_FUNC_COUNT && pExpr->base.colInfo.colIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
return invalidOperationMsg(msg, msg1);
}
}
if (checkUpdateTagPrjFunctions(pQueryInfo, pCmd) != TSDB_CODE_SUCCESS) {
if (checkUpdateTagPrjFunctions(pQueryInfo, msg) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
@ -6531,13 +6555,13 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
}
// projection query on super table does not compatible with "group by" syntax
if (tscIsProjectionQuery(pQueryInfo)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
if (tscIsProjectionQuery(pQueryInfo) && !(tscIsDiffDerivQuery(pQueryInfo))) {
return invalidOperationMsg(msg, msg3);
}
return TSDB_CODE_SUCCESS;
} else {
return checkUpdateTagPrjFunctions(pQueryInfo, pCmd);
return checkUpdateTagPrjFunctions(pQueryInfo, msg);
}
}
int32_t doLocalQueryProcess(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode) {
@ -6623,6 +6647,13 @@ int32_t tscCheckCreateDbParams(SSqlCmd* pCmd, SCreateDbMsg* pCreate) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
int32_t blocks = ntohl(pCreate->totalBlocks);
if (blocks != -1 && (blocks < TSDB_MIN_TOTAL_BLOCKS || blocks > TSDB_MAX_TOTAL_BLOCKS)) {
snprintf(msg, tListLen(msg), "invalid db option totalBlocks: %d valid range: [%d, %d]", blocks,
TSDB_MIN_TOTAL_BLOCKS, TSDB_MAX_TOTAL_BLOCKS);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
if (pCreate->quorum != -1 &&
(pCreate->quorum < TSDB_MIN_DB_QUORUM_OPTION || pCreate->quorum > TSDB_MAX_DB_QUORUM_OPTION)) {
snprintf(msg, tListLen(msg), "invalid db option quorum: %d valid range: [%d, %d]", pCreate->quorum,
@ -7052,6 +7083,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
// project query primary column must be timestamp type
if (tscIsProjectionQuery(pQueryInfo)) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, 0);
if (pExpr->base.colInfo.colId != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
@ -7731,7 +7763,8 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg2 = "too many tables in from clause";
const char* msg3 = "start(end) time of query range required or time range too large";
const char* msg4 = "interval query not supported, since the result of sub query not include valid timestamp column";
const char* msg9 = "only tag query not compatible with normal column filter";
const char* msg5 = "only tag query not compatible with normal column filter";
const char* msg6 = "not support stddev/percentile in outer query yet";
int32_t code = TSDB_CODE_SUCCESS;
@ -7772,6 +7805,15 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return TSDB_CODE_TSC_INVALID_OPERATION;
}
// todo NOT support yet
for(int32_t i = 0; i < tscNumOfExprs(pQueryInfo); ++i) {
SExprInfo* pExpr = tscExprGet(pQueryInfo, i);
int32_t f = pExpr->base.functionId;
if (f == TSDB_FUNC_STDDEV || f == TSDB_FUNC_PERCT) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
}
// validate the query filter condition info
if (pSqlNode->pWhere != NULL) {
if (validateWhereNode(pQueryInfo, &pSqlNode->pWhere, pSql) != TSDB_CODE_SUCCESS) {
@ -7789,7 +7831,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
if (isTimeWindowQuery(pQueryInfo)) {
if (isTimeWindowQuery(pQueryInfo) || pQueryInfo->sessionWindow.gap > 0) {
// check if the first column of the nest query result is timestamp column
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, 0);
if (pCol->info.type != TSDB_DATA_TYPE_TIMESTAMP) {
@ -7804,10 +7846,13 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// set order by info
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMeta)) !=
TSDB_CODE_SUCCESS) {
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMeta)) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if ((code = doFunctionsCompatibleCheck(pCmd, pQueryInfo, tscGetErrorMsgPayload(pCmd))) != TSDB_CODE_SUCCESS) {
return code;
}
} else {
pQueryInfo->command = TSDB_SQL_SELECT;
@ -7872,11 +7917,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
// set interval value
if (validateIntervalNode(pSql, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
} else {
if (isTimeWindowQuery(pQueryInfo) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
if (tscQueryTags(pQueryInfo)) {
@ -7887,7 +7927,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
for (int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCols = taosArrayGetP(pQueryInfo->colList, i);
if (pCols->info.flist.numOfFilters > 0) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg9);
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
}
}
}
@ -7907,6 +7947,11 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if ((isTimeWindowQuery(pQueryInfo) || pQueryInfo->sessionWindow.gap > 0) &&
(validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (isSTable) {
tscTansformFuncForSTableQuery(pQueryInfo);
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
@ -7936,7 +7981,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
return code;
}
if ((code = doFunctionsCompatibleCheck(pCmd, pQueryInfo)) != TSDB_CODE_SUCCESS) {
if ((code = doFunctionsCompatibleCheck(pCmd, pQueryInfo,tscGetErrorMsgPayload(pCmd))) != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -912,7 +912,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SGroupbyExpr *pGroupbyExpr = query.pGroupbyExpr;
if (pGroupbyExpr->numOfGroupCols > 0) {
if (pGroupbyExpr != NULL && pGroupbyExpr->numOfGroupCols > 0) {
pQueryMsg->orderByIdx = htons(pGroupbyExpr->orderIndex);
pQueryMsg->orderType = htons(pGroupbyExpr->orderType);

View File

@ -139,8 +139,13 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
pStream->numOfRes = 0; // reset the numOfRes.
SSqlObj *pSql = pStream->pSql;
// pSql == NULL maybe killStream already called
if(pSql == NULL) {
return ;
}
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" timer launch query", pSql->self);
tscDebug("0x%"PRIx64" add into timer", pSql->self);
if (pStream->isProject) {
/*
@ -339,8 +344,12 @@ static void tscSetRetryTimer(SSqlStream *pStream, SSqlObj *pSql, int64_t timer)
if (pStream->isProject) {
int64_t now = taosGetTimestamp(pStream->precision);
int64_t etime = now > pStream->etime ? pStream->etime : now;
int64_t maxRetent = tsMaxRetentWindow * 1000;
if(pStream->precision == TSDB_TIME_PRECISION_MICRO) {
maxRetent *= 1000;
}
if (pStream->etime < now && now - pStream->etime > tsMaxRetentWindow) {
if (pStream->etime < now && now - pStream->etime > maxRetent) {
/*
* current time window will be closed, since it too early to exceed the maxRetentWindow value
*/
@ -664,7 +673,7 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
}
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
@ -697,6 +706,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream->callback = callback;
pStream->param = param;
pStream->pSql = pSql;
pStream->cqhandle = cqhandle;
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
@ -745,7 +755,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
}
void taos_close_stream(TAOS_STREAM *handle) {

View File

@ -255,10 +255,14 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t functionId = tscExprGet(pQueryInfo, i)->base.functionId;
int32_t f = tscExprGet(pQueryInfo, i)->base.functionId;
if (f == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (functionId != TSDB_FUNC_PRJ && functionId != TSDB_FUNC_TAGPRJ && functionId != TSDB_FUNC_TAG &&
functionId != TSDB_FUNC_TS && functionId != TSDB_FUNC_ARITHM) {
if (f != TSDB_FUNC_PRJ && f != TSDB_FUNC_TAGPRJ && f != TSDB_FUNC_TAG &&
f != TSDB_FUNC_TS && f != TSDB_FUNC_ARITHM && f != TSDB_FUNC_DIFF &&
f != TSDB_FUNC_DERIVATIVE) {
return false;
}
}
@ -266,6 +270,24 @@ bool tscIsProjectionQuery(SQueryInfo* pQueryInfo) {
return true;
}
bool tscIsDiffDerivQuery(SQueryInfo* pQueryInfo) {
size_t size = tscNumOfExprs(pQueryInfo);
for (int32_t i = 0; i < size; ++i) {
int32_t f = tscExprGet(pQueryInfo, i)->base.functionId;
if (f == TSDB_FUNC_TS_DUMMY) {
continue;
}
if (f == TSDB_FUNC_DIFF || f == TSDB_FUNC_DERIVATIVE) {
return true;
}
}
return false;
}
bool tscHasColumnFilter(SQueryInfo* pQueryInfo) {
// filter on primary timestamp column
if (pQueryInfo->window.skey != INT64_MIN || pQueryInfo->window.ekey != INT64_MAX) {
@ -962,6 +984,9 @@ static void destroyDummyInputOperator(void* param, int32_t numOfOutput) {
pInfo->block = destroyOutputBuf(pInfo->block);
pInfo->pSql = NULL;
cleanupResultRowInfo(&pInfo->pTableQueryInfo->resInfo);
tfree(pInfo->pTableQueryInfo);
}
// todo this operator servers as the adapter for Operator tree and SqlRes result, remove it later
@ -4263,10 +4288,9 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
*(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr;
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pQueryAttr->pGroupbyExpr->columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
} else {
assert(pQueryInfo->groupbyExpr.columnInfo == NULL);
@ -4345,7 +4369,7 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (pQueryAttr->pGroupbyExpr->numOfGroupCols < 0) {
if (pQueryAttr->pGroupbyExpr != NULL && pQueryAttr->pGroupbyExpr->numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", addr, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_OPERATION;
}

View File

@ -831,6 +831,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "precision";
cfg.ptr = &tsTimePrecision;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_PRECISION;
cfg.maxValue = TSDB_MAX_PRECISION;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "comp";
cfg.ptr = &tsCompression;
cfg.valType = TAOS_CFG_VTYPE_INT8;
@ -901,6 +911,16 @@ static void doInitGlobalConfig(void) {
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "cachelast";
cfg.ptr = &tsCacheLastRow;
cfg.valType = TAOS_CFG_VTYPE_INT8;
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
cfg.minValue = TSDB_MIN_DB_CACHE_LAST_ROW;
cfg.maxValue = TSDB_MAX_DB_CACHE_LAST_ROW;
cfg.ptrLength = 0;
cfg.unitType = TAOS_CFG_UTYPE_NONE;
taosInitConfigOption(cfg);
cfg.option = "mqttHostName";
cfg.ptr = tsMqttHostName;
cfg.valType = TAOS_CFG_VTYPE_STRING;

View File

@ -74,7 +74,7 @@ void tVariantCreate(tVariant *pVar, SStrToken *token) {
case TSDB_DATA_TYPE_BINARY: {
pVar->pz = strndup(token->z, token->n);
pVar->nLen = strdequote(pVar->pz);
pVar->nLen = strRmquote(pVar->pz, token->n);
break;
}

View File

@ -177,6 +177,7 @@ public class TSDBResultSetTest {
rs.getAsciiStream("f1");
}
@SuppressWarnings("deprecation")
@Test(expected = SQLFeatureNotSupportedException.class)
public void getUnicodeStream() throws SQLException {
rs.getUnicodeStream("f1");

View File

@ -242,7 +242,7 @@ def _load_taos_linux():
def _load_taos_darwin():
return ctypes.cDLL('libtaos.dylib')
return ctypes.CDLL('libtaos.dylib')
def _load_taos_windows():

View File

@ -38,21 +38,6 @@
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
typedef struct {
int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
typedef struct SCqObj {
tmr_h tmrId;
@ -439,7 +424,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
// inner implement in tscStream.c
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->pContext = pContext;
@ -453,7 +438,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj->tmrId = 0;
if (pObj->pStream == NULL) {
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
INT64_MIN, (void *)pObj->rid, NULL, pContext);
// TODO the pObj->pStream may be released if error happens
if (pObj->pStream) {

View File

@ -31,6 +31,23 @@ typedef struct {
FCqWrite cqWrite;
} SCqCfg;
// SCqContext
typedef struct {
int32_t vgId;
int32_t master;
int32_t num; // number of continuous streams
char user[TSDB_USER_LEN];
char pass[TSDB_KEY_LEN];
char db[TSDB_DB_NAME_LEN];
FCqWrite cqWrite;
struct SCqObj *pHead;
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
// the following API shall be called by vnode
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
void cqClose(void *handle);

View File

@ -94,7 +94,7 @@ STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo);
bool tsdbInCompact(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID

View File

@ -625,6 +625,10 @@ static int64_t g_totalChildTables = 0;
static SQueryMetaInfo g_queryInfo;
static FILE * g_fpOfInsertResult = NULL;
#if _MSC_VER <= 1900
#define __func__ __FUNCTION__
#endif
#define debugPrint(fmt, ...) \
do { if (g_args.debug_print || g_args.verbose_print) \
fprintf(stderr, "DEBG: "fmt, __VA_ARGS__); } while(0)
@ -1209,7 +1213,6 @@ static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
}
int totalLen = 0;
char temp[16000];
// fetch the records row by row
while((row = taos_fetch_row(res))) {
@ -1220,6 +1223,7 @@ static void fetchResult(TAOS_RES *res, threadInfo* pThreadInfo) {
memset(databuf, 0, 100*1024*1024);
}
num_rows++;
char temp[16000] = {0};
int len = taos_print_row(temp, row, fields, num_fields);
len += sprintf(temp + len, "\n");
//printf("query result:%s\n", temp);

View File

@ -59,6 +59,9 @@ bool taosMbsToUcs4(char *mbs, size_t mbsLength, char *ucs4, int32_t ucs4_max_len
iconv_close(cd);
if (len != NULL) {
*len = (int32_t)(ucs4_max_len - outLeft);
if (*len < 0) {
return false;
}
}
return true;

View File

@ -464,6 +464,7 @@ typedef struct SSWindowOperatorInfo {
TSKEY prevTs; // previous timestamp
int32_t numOfRows; // number of rows
int32_t start; // start row index
bool reptScan; // next round scan
} SSWindowOperatorInfo;
typedef struct SStateWindowOperatorInfo {
@ -473,7 +474,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t colIndex; // start row index
int32_t start;
char* prevData; // previous data
bool reptScan;
} SStateWindowOperatorInfo ;
typedef struct SDistinctOperatorInfo {

View File

@ -559,10 +559,8 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
X.gap = Y;
}
%type windowstate_option {SWindowStateVal}
windowstate_option(X) ::= . {X.col.n = 0;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. {
X.col = V;
}
windowstate_option(X) ::= . { X.col.n = 0; X.col.z = NULL;}
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. { X.col = V; }
%type fill_opt {SArray*}
%destructor fill_opt {taosArrayDestroy($$);}

View File

@ -3428,7 +3428,7 @@ static bool deriv_function_setup(SQLFunctionCtx *pCtx) {
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
pDerivInfo->ignoreNegative = pCtx->param[2].i64;
pDerivInfo->ignoreNegative = pCtx->param[1].i64;
pDerivInfo->prevTs = -1;
pDerivInfo->tsWindow = pCtx->param[0].i64;
pDerivInfo->valueSet = false;
@ -3440,10 +3440,8 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
SDerivInfo* pDerivInfo = GET_ROWCELL_INTERBUF(pResInfo);
void *data = GET_INPUT_DATA_LIST(pCtx);
bool isFirstBlock = (pDerivInfo->valueSet == false);
int32_t notNullElems = 0;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pCtx->order);
int32_t i = (pCtx->order == TSDB_ORDER_ASC) ? 0 : pCtx->size - 1;
@ -3469,12 +3467,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
@ -3496,12 +3494,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = (double) pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
@ -3522,12 +3520,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
@ -3549,12 +3547,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
@ -3575,12 +3573,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*pTimestamp = tsList[i];
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
@ -3602,12 +3600,12 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
pOutput += 1;
pTimestamp += 1;
notNullElems++;
}
}
pDerivInfo->prevValue = pData[i];
pDerivInfo->prevTs = tsList[i];
notNullElems++;
}
break;
}
@ -3623,8 +3621,7 @@ static void deriv_function(SQLFunctionCtx *pCtx) {
*/
assert(pCtx->hasNull);
} else {
int32_t forwardStep = (isFirstBlock) ? notNullElems - 1 : notNullElems;
GET_RES_INFO(pCtx)->numOfRes += forwardStep;
GET_RES_INFO(pCtx)->numOfRes += notNullElems;
}
}
@ -4687,8 +4684,8 @@ static bool rate_function_setup(SQLFunctionCtx *pCtx) {
pInfo->correctionValue = 0;
pInfo->firstKey = INT64_MIN;
pInfo->lastKey = INT64_MIN;
pInfo->firstValue = INT64_MIN;
pInfo->lastValue = INT64_MIN;
pInfo->firstValue = (double) INT64_MIN;
pInfo->lastValue = (double) INT64_MIN;
pInfo->hasResult = 0;
pInfo->isIRate = (pCtx->functionId == TSDB_FUNC_IRATE);
@ -5003,6 +5000,19 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
min = totalBlocks > 0 ? pTableBlockDist->minRows : 0;
max = totalBlocks > 0 ? pTableBlockDist->maxRows : 0;
double stdDev = 0;
if (totalBlocks > 0) {
double variance = 0;
for (int32_t i = 0; i < numSteps; i++) {
SFileBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t blocks = blockInfo->numBlocksOfStep;
int32_t rows = (i * TSDB_BLOCK_DIST_STEP_ROWS + TSDB_BLOCK_DIST_STEP_ROWS / 2);
variance += blocks * (rows - avg) * (rows - avg);
}
variance = variance / totalBlocks;
stdDev = sqrt(variance);
}
double percents[] = {0.05, 0.10, 0.20, 0.30, 0.40, 0.50, 0.60, 0.70, 0.80, 0.90, 0.95, 0.99};
int32_t percentiles[] = {-1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1};
assert(sizeof(percents)/sizeof(double) == sizeof(percentiles)/sizeof(int32_t));
@ -5017,12 +5027,12 @@ void generateBlockDistResult(STableBlockDist *pTableBlockDist, char* result) {
"60th=[%d], 70th=[%d], 80th=[%d], 90th=[%d], 95th=[%d], 99th=[%d]\n\t "
"Min=[%"PRId64"(Rows)] Max=[%"PRId64"(Rows)] Avg=[%"PRId64"(Rows)] Stddev=[%.2f] \n\t "
"Rows=[%"PRIu64"], Blocks=[%"PRId64"], Size=[%.3f(Kb)] Comp=[%.2f]\n\t "
"RowsInMem=[%d] \n\t SeekHeaderTime=[%d(us)]",
"RowsInMem=[%d] \n\t",
percentiles[0], percentiles[1], percentiles[2], percentiles[3], percentiles[4], percentiles[5],
percentiles[6], percentiles[7], percentiles[8], percentiles[9], percentiles[10], percentiles[11],
min, max, avg, 0.0,
min, max, avg, stdDev,
totalRows, totalBlocks, totalLen/1024.0, compRatio,
pTableBlockDist->numOfRowsInMemTable, pTableBlockDist->firstSeekTimeUs);
pTableBlockDist->numOfRowsInMemTable);
varDataSetLen(result, sz);
UNUSED(sz);
}
@ -5290,7 +5300,7 @@ SAggFunctionInfo aAggs[] = {{
},
{
// 17
"ts_dummy",
"ts",
TSDB_FUNC_TS_DUMMY,
TSDB_FUNC_TS_DUMMY,
TSDB_BASE_FUNC_SO | TSDB_FUNCSTATE_NEED_TS,
@ -5384,7 +5394,7 @@ SAggFunctionInfo aAggs[] = {{
"diff",
TSDB_FUNC_DIFF,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
diff_function_setup,
diff_function,
diff_function_f,
@ -5488,7 +5498,7 @@ SAggFunctionInfo aAggs[] = {{
"derivative", // return table id and the corresponding tags for join match and subscribe
TSDB_FUNC_DERIVATIVE,
TSDB_FUNC_INVALID_ID,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS,
TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STABLE | TSDB_FUNCSTATE_NEED_TS | TSDB_FUNCSTATE_SELECTIVITY,
deriv_function_setup,
deriv_function,
noop2,

View File

@ -30,6 +30,7 @@
#define IS_MASTER_SCAN(runtime) ((runtime)->scanFlag == MASTER_SCAN)
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
#define IS_REPEAT_SCAN(runtime) ((runtime)->scanFlag == REPEAT_SCAN)
#define SET_MASTER_SCAN_FLAG(runtime) ((runtime)->scanFlag = MASTER_SCAN)
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
@ -735,6 +736,7 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
pCtx[k].preAggVals.isSet = false;
}
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
aAggs[functionId].xFunction(&pCtx[k]);
}
@ -918,7 +920,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SSDataBlo
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
}
} else {
if (pCtx[0].pInput == NULL && pBlock->pDataBlock != NULL) {
if (/*pCtx[0].pInput == NULL && */pBlock->pDataBlock != NULL) {
doSetInputDataBlock(pOperator, pCtx, pBlock, order);
} else {
doSetInputDataBlockInfo(pOperator, pCtx, pBlock, order);
@ -1336,6 +1338,10 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
int64_t gap = pOperator->pRuntimeEnv->pQueryAttr->sw.gap;
pInfo->numOfRows = 0;
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
pInfo->reptScan = true;
pInfo->prevTs = INT64_MIN;
}
TSKEY* tsList = (TSKEY*)pColInfoData->pData;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
@ -1345,7 +1351,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
pInfo->prevTs = tsList[j];
pInfo->numOfRows = 1;
pInfo->start = j;
} else if (tsList[j] - pInfo->prevTs <= gap) {
} else if (tsList[j] - pInfo->prevTs <= gap && (tsList[j] - pInfo->prevTs) >= 0) {
pInfo->curWindow.ekey = tsList[j];
pInfo->prevTs = tsList[j];
pInfo->numOfRows += 1;
@ -1681,8 +1687,6 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
pRuntimeEnv->prevGroupId = INT32_MIN;
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pResultRowHashTable = taosHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
@ -3094,7 +3098,7 @@ int32_t initResultRow(SResultRow *pResultRow) {
* +------------+-----------------result column 1-----------+-----------------result column 2-----------+
* + SResultRow | SResultRowCellInfo | intermediate buffer1 | SResultRowCellInfo | intermediate buffer 2|
* +------------+-------------------------------------------+-------------------------------------------+
* offset[0] offset[1]
* offset[0] offset[1] offset[2]
*/
void setDefaultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *pInfo, int64_t uid, int32_t stage) {
SQLFunctionCtx* pCtx = pInfo->pCtx;
@ -3323,7 +3327,7 @@ void setResultRowOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pRe
offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
@ -3381,7 +3385,7 @@ void setResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SQLF
offset += pCtx[i].outputBytes;
int32_t functionId = pCtx[i].functionId;
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF) {
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM || functionId == TSDB_FUNC_DIFF || functionId == TSDB_FUNC_DERIVATIVE) {
pCtx[i].ptsOutputBuf = pCtx[0].pOutput;
}
@ -3589,6 +3593,8 @@ static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo*
int32_t step = -1;
qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_QID(pRuntimeEnv));
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
if (orderType == TSDB_ORDER_ASC) {
start = pGroupResInfo->index;
step = 1;
@ -4115,6 +4121,7 @@ int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
pRuntimeEnv->groupResInfo.totalGroup = (int32_t) (pQueryAttr->stableQuery? GET_NUM_OF_TABLEGROUP(pRuntimeEnv):0);
pRuntimeEnv->enableGroupData = false;
pRuntimeEnv->pQueryAttr = pQueryAttr;
pRuntimeEnv->pTsBuf = pTsBuf;
@ -4570,7 +4577,7 @@ SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntime
}
SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols;
int32_t numOfCols = pQuery->pGroupbyExpr == NULL? 0: pQuery->pGroupbyExpr->numOfGroupCols;
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
@ -4609,7 +4616,7 @@ SArray* getOrderCheckColumns(SQueryAttr* pQuery) {
}
SArray* getResultGroupCheckColumns(SQueryAttr* pQuery) {
int32_t numOfCols = pQuery->pGroupbyExpr->numOfGroupCols;
int32_t numOfCols = pQuery->pGroupbyExpr == NULL? 0 : pQuery->pGroupbyExpr->numOfGroupCols;
SArray* pOrderColumns = NULL;
if (numOfCols > 0) {
@ -5175,6 +5182,10 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
if (IS_REPEAT_SCAN(pRuntimeEnv) && !pInfo->reptScan) {
pInfo->reptScan = true;
tfree(pInfo->prevData);
}
pInfo->numOfRows = 0;
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
@ -5761,6 +5772,7 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
pInfo->colIndex = -1;
pInfo->reptScan = false;
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
@ -5789,6 +5801,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
pInfo->prevTs = INT64_MIN;
pInfo->reptScan = false;
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
pOperator->name = "SessionWindowAggOperator";
@ -7213,7 +7226,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
// todo refactor
pQInfo->query.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.functionId == TSDB_FUNC_BLKINFO);
qDebug("qmsg:%p QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->qId, pQInfo);
qDebug("qmsg:%p vgId:%d, QInfo:0x%" PRIx64 "-%p created", pQueryMsg, pQInfo->query.vgId, pQInfo->qId, pQInfo);
return pQInfo;
_cleanup_qinfo:

View File

@ -132,7 +132,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
numOfGroupByCols = 0;
}
qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
qDebug("qmsg:%p query stable, uid:%"PRIu64", tid:%d", pQueryMsg, id->uid, id->tid);
code = tsdbQuerySTableByTagCond(tsdb, id->uid, pQueryMsg->window.skey, param.tagCond, pQueryMsg->tagCondLen,
pQueryMsg->tagNameRelType, param.tbnameCond, &tableGroupInfo, param.pGroupColIndex, numOfGroupByCols);

File diff suppressed because it is too large Load Diff

View File

@ -92,6 +92,7 @@ struct STsdbRepo {
pthread_mutex_t mutex;
bool repoLocked;
int32_t code; // Commit code
bool inCompact; // is in compact process?
};
#define REPO_ID(r) (r)->config.tsdbId

View File

@ -12,11 +12,516 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tsdb.h"
#include "tsdbint.h"
#ifndef _TSDB_PLUGINS
int tsdbCompact(STsdbRepo *pRepo) { return 0; }
void *tsdbCompactImpl(STsdbRepo *pRepo) { return NULL; }
typedef struct {
STable * pTable;
SBlockIdx * pBlkIdx;
SBlockIdx bindex;
SBlockInfo *pInfo;
} STableCompactH;
typedef struct {
SRtn rtn;
SFSIter fsIter;
SArray * tbArray; // table array to cache table obj and block indexes
SReadH readh;
SDFileSet wSet;
SArray * aBlkIdx;
SArray * aSupBlk;
SDataCols *pDataCols;
} SCompactH;
#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
static int tsdbAsyncCompact(STsdbRepo *pRepo);
static void tsdbStartCompact(STsdbRepo *pRepo);
static void tsdbEndCompact(STsdbRepo *pRepo, int eno);
static int tsdbCompactMeta(STsdbRepo *pRepo);
static int tsdbCompactTSData(STsdbRepo *pRepo);
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet);
static bool tsdbShouldCompact(SCompactH *pComph);
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo);
static void tsdbDestroyCompactH(SCompactH *pComph);
static int tsdbInitCompTbArray(SCompactH *pComph);
static void tsdbDestroyCompTbArray(SCompactH *pComph);
static int tsdbCacheFSetIndex(SCompactH *pComph);
static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet);
static void tsdbCompactFSetEnd(SCompactH *pComph);
static int tsdbCompactFSetImpl(SCompactH *pComph);
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf);
int tsdbCompact(STsdbRepo *pRepo) { return tsdbAsyncCompact(pRepo); }
void *tsdbCompactImpl(STsdbRepo *pRepo) {
// Check if there are files in TSDB FS to compact
if (REPO_FS(pRepo)->cstatus->pmf == NULL) {
tsdbInfo("vgId:%d no file to compact in FS", REPO_ID(pRepo));
return NULL;
}
tsdbStartCompact(pRepo);
if (tsdbCompactMeta(pRepo) < 0) {
tsdbError("vgId:%d failed to compact META data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
if (tsdbCompactTSData(pRepo) < 0) {
tsdbError("vgId:%d failed to compact TS data since %s", REPO_ID(pRepo), tstrerror(terrno));
goto _err;
}
tsdbEndCompact(pRepo, TSDB_CODE_SUCCESS);
return NULL;
_err:
pRepo->code = terrno;
tsdbEndCompact(pRepo, terrno);
return NULL;
}
static int tsdbAsyncCompact(STsdbRepo *pRepo) {
tsem_wait(&(pRepo->readyToCommit));
return tsdbScheduleCommit(pRepo, COMPACT_REQ);
}
static void tsdbStartCompact(STsdbRepo *pRepo) {
ASSERT(!pRepo->inCompact);
tsdbInfo("vgId:%d start to compact!", REPO_ID(pRepo));
tsdbStartFSTxn(pRepo, 0, 0);
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->inCompact = true;
}
static void tsdbEndCompact(STsdbRepo *pRepo, int eno) {
if (eno != TSDB_CODE_SUCCESS) {
tsdbEndFSTxnWithError(REPO_FS(pRepo));
} else {
tsdbEndFSTxn(pRepo);
}
pRepo->inCompact = false;
tsdbInfo("vgId:%d compact over, %s", REPO_ID(pRepo), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
tsem_post(&(pRepo->readyToCommit));
}
static int tsdbCompactMeta(STsdbRepo *pRepo) {
STsdbFS *pfs = REPO_FS(pRepo);
tsdbUpdateMFile(pfs, pfs->cstatus->pmf);
return 0;
}
static int tsdbCompactTSData(STsdbRepo *pRepo) {
SCompactH compactH;
SDFileSet *pSet = NULL;
tsdbDebug("vgId:%d start to compact TS data", REPO_ID(pRepo));
// If no file, just return 0;
if (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0) {
tsdbDebug("vgId:%d no TS data file to compact, compact over", REPO_ID(pRepo));
return 0;
}
if (tsdbInitCompactH(&compactH, pRepo) < 0) {
return -1;
}
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
// Remove those expired files
if (pSet->fid < compactH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
}
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not compact", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
continue;
}
if (tsdbCompactFSet(&compactH, pSet) < 0) {
tsdbDestroyCompactH(&compactH);
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
}
tsdbDestroyCompactH(&compactH);
tsdbDebug("vgId:%d compact TS data over", REPO_ID(pRepo));
return 0;
}
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
SDiskID did;
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
if (tsdbCompactFSetInit(pComph, pSet) < 0) {
return -1;
}
if (!tsdbShouldCompact(pComph)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_COMPACT_REPO(pComph), pSet, &(pComph->rtn)) < 0) {
tsdbCompactFSetEnd(pComph);
return -1;
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
return -1;
}
tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
return -1;
}
if (tsdbCompactFSetImpl(pComph) < 0) {
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbRemoveDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbCompactFSetEnd(pComph);
return -1;
}
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_COMPACT_WSET(pComph));
tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
}
tsdbCompactFSetEnd(pComph);
return 0;
}
static bool tsdbShouldCompact(SCompactH *pComph) {
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
STableCompactH *pTh;
SBlock * pBlock;
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh);
SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh);
int tblocks = 0; // total blocks
int nSubBlocks = 0; // # of blocks with sub-blocks
int nSmallBlocks = 0; // # of blocks with rows < defaultRows
int64_t tsize = 0;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
tblocks++;
pBlock = pTh->pInfo->blocks + bidx;
if (pBlock->numOfRows < defaultRows) {
nSmallBlocks++;
}
if (pBlock->numOfSubBlocks > 1) {
nSubBlocks++;
for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
tsize = tsize + iBlock->len;
}
} else if (pBlock->numOfSubBlocks == 1) {
tsize += pBlock->len;
} else {
ASSERT(0);
}
}
}
return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
}
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pComph, 0, sizeof(*pComph));
TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph));
tsdbGetRtnSnap(pRepo, &(pComph->rtn));
tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pComph->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitCompTbArray(pComph) < 0) {
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pComph->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pComph->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
if (pComph->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
return 0;
}
static void tsdbDestroyCompactH(SCompactH *pComph) {
pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
pComph->aSupBlk = taosArrayDestroy(pComph->aSupBlk);
pComph->aBlkIdx = taosArrayDestroy(pComph->aBlkIdx);
tsdbDestroyCompTbArray(pComph);
tsdbDestroyReadH(&(pComph->readh));
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
}
static int tsdbInitCompTbArray(SCompactH *pComph) { // Init pComp->tbArray
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
pComph->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableCompactH));
if (pComph->tbArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; i++) {
STableCompactH ch = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
ch.pTable = pMeta->tables[i];
}
if (taosArrayPush(pComph->tbArray, &ch) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
return 0;
}
static void tsdbDestroyCompTbArray(SCompactH *pComph) {
STableCompactH *pTh;
if (pComph->tbArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable) {
tsdbUnRefTable(pTh->pTable);
}
pTh->pInfo = taosTZfree(pTh->pInfo);
}
pComph->tbArray = taosArrayDestroy(pComph->tbArray);
}
static int tsdbCacheFSetIndex(SCompactH *pComph) {
SReadH *pReadH = &(pComph->readh);
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
}
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
pTh->pBlkIdx = NULL;
if (pTh->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
return -1;
}
if (pReadH->pBlkIdx == NULL) continue;
pTh->bindex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bindex);
if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
return -1;
}
if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) {
return -1;
}
}
return 0;
}
static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet) {
taosArrayClear(pComph->aBlkIdx);
taosArrayClear(pComph->aSupBlk);
if (tsdbSetAndOpenReadFSet(&(pComph->readh), pSet) < 0) {
return -1;
}
if (tsdbCacheFSetIndex(pComph) < 0) {
tsdbCloseAndUnsetFSet(&(pComph->readh));
return -1;
}
return 0;
}
static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); }
static int tsdbCompactFSetImpl(SCompactH *pComph) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
SBlockIdx blkIdx;
void ** ppBuf = &(TSDB_COMPACT_BUF(pComph));
void ** ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pComph->aBlkIdx);
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
STSchema * pSchema;
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
taosArrayClear(pComph->aSupBlk);
if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdFreeSchema(pSchema);
// Loop to compact each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
// Load the block data
if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
return -1;
}
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
while (true) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows);
tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx);
if (pComph->pDataCols->numOfRows < defaultRows) {
break;
}
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return -1;
}
tdResetDataCols(pComph->pDataCols);
}
}
}
if (pComph->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_COMPACT_HEAD_FILE(pComph), pTh->pTable, pComph->aSupBlk, NULL, ppBuf, &blkIdx) <
0) {
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pComph->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_COMPACT_HEAD_FILE(pComph), pComph->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
SBlock block;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMPACT_LAST_FILE(pComph);
isLast = true;
} else {
pDFile = TSDB_COMPACT_DATA_FILE(pComph);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) {
return -1;
}
if (taosArrayPush(pComph->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}
#endif

View File

@ -195,6 +195,8 @@ STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo) { return NULL; }
int tsdbGetState(STsdbRepo *repo) { return repo->state; }
bool tsdbInCompact(STsdbRepo *repo) { return repo->inCompact; }
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage) {
ASSERT(repo != NULL);
STsdbRepo *pRepo = repo;
@ -533,6 +535,7 @@ static STsdbRepo *tsdbNewRepo(STsdbCfg *pCfg, STsdbAppH *pAppH) {
pRepo->state = TSDB_STATE_OK;
pRepo->code = TSDB_CODE_SUCCESS;
pRepo->inCompact = false;
pRepo->config = *pCfg;
if (pAppH) {
pRepo->appH = *pAppH;

View File

@ -218,11 +218,6 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(pQueryHandle->activeIndex < size && pQueryHandle->activeIndex >= 0 && size >= 1);
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int64_t rows = 0;
SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pMemRef == NULL) { return rows; }
@ -233,6 +228,10 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
SMemTable* pMemT = pMemRef->snapshot.mem;
SMemTable* pIMemT = pMemRef->snapshot.imem;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < size; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid];
rows += (pMem && pMem->uid == pCheckInfo->tableId.uid) ? pMem->numOfRows : 0;
@ -241,7 +240,7 @@ int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
pIMem = pIMemT->tData[pCheckInfo->tableId.tid];
rows += (pIMem && pIMem->uid == pCheckInfo->tableId.uid) ? pIMem->numOfRows : 0;
}
}
return rows;
}
@ -1088,7 +1087,11 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
assert(cur->pos >= 0 && cur->pos <= binfo.rows);
TSKEY key = (row != NULL)? dataRowKey(row):TSKEY_INITIAL_VAL;
if (key != TSKEY_INITIAL_VAL) {
tsdbDebug("%p key in mem:%"PRId64", 0x%"PRIx64, pQueryHandle, key, pQueryHandle->qId);
} else {
tsdbDebug("%p no data in mem, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
}
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key <= binfo.window.ekey)) ||
(!ASCENDING_TRAVERSE(pQueryHandle->order) && (key != TSKEY_INITIAL_VAL && key >= binfo.window.skey))) {
@ -1152,8 +1155,14 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
}
assert(cur->blockCompleted);
tsdbDebug("create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %p",
cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle);
if (cur->rows == binfo.rows) {
tsdbDebug("%p whole file block qualified, brange:%"PRId64"-%"PRId64", rows:%d, lastKey:%"PRId64", %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, cur->lastKey, pQueryHandle->qId);
} else {
tsdbDebug("%p create data block from remain file block, brange:%"PRId64"-%"PRId64", rows:%d, total:%d, lastKey:%"PRId64", %"PRIx64,
pQueryHandle, cur->win.skey, cur->win.ekey, cur->rows, binfo.rows, cur->lastKey, pQueryHandle->qId);
}
}
return code;

View File

@ -26,6 +26,7 @@ extern "C" {
#include "taosdef.h"
int32_t strdequote(char *src);
int32_t strRmquote(char *z, int32_t len);
size_t strtrim(char *src);
char * strnchr(char *haystack, char needle, int32_t len, bool skipquote);
char ** strsplit(char *src, const char *delim, int32_t *num);

View File

@ -52,6 +52,36 @@ int32_t strdequote(char *z) {
return j + 1; // only one quote, do nothing
}
int32_t strRmquote(char *z, int32_t len){
// delete escape character: \\, \', \"
char delim = z[0];
if (delim != '\'' && delim != '\"') {
return len;
}
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < len - 1; ++k) {
if (z[k] == '\\' || (z[k] == delim && z[k + 1] == delim)) {
z[j] = z[k + 1];
cnt++;
j++;
k++;
continue;
}
z[j] = z[k];
j++;
}
z[j] = 0;
return len - 2 - cnt;
}
size_t strtrim(char *z) {
int32_t i = 0;
int32_t j = 0;

View File

@ -216,13 +216,13 @@
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<version>1.0.13</version>
<version>1.2.0</version>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.0.13</version>
<version>1.2.0</version>
</dependency>
<dependency>

View File

@ -86,7 +86,7 @@ static int print_result(TAOS_RES* res, int blockFetch) {
}
} else {
while ((row = taos_fetch_row(res))) {
char temp[256];
char temp[256] = {0};
taos_print_row(temp, row, fields, num_fields);
puts(temp);
nRows++;
@ -391,10 +391,10 @@ void verify_prepare(TAOS* taos) {
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
@ -614,10 +614,10 @@ void verify_prepare2(TAOS* taos) {
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);
@ -866,12 +866,10 @@ void verify_prepare3(TAOS* taos) {
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256] = {0};
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
memset(temp, 0, sizeof(temp)/sizeof(temp[0]));
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);

View File

@ -116,12 +116,12 @@ void Test(TAOS *taos, char *qstr, int index) {
int rows = 0;
int num_fields = taos_field_count(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[1024];
printf("num_fields = %d\n", num_fields);
printf("select * from table, result:\n");
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[1024] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);

View File

@ -184,10 +184,10 @@ int main(int argc, char *argv[])
int rows = 0;
int num_fields = taos_num_fields(result);
TAOS_FIELD *fields = taos_fetch_fields(result);
char temp[256];
// fetch the records row by row
while ((row = taos_fetch_row(result))) {
char temp[256] = {0};
rows++;
taos_print_row(temp, row, fields, num_fields);
printf("%s\n", temp);

View File

@ -14,8 +14,6 @@ void print_result(TAOS_RES* res, int blockFetch) {
int num_fields = taos_num_fields(res);
TAOS_FIELD* fields = taos_fetch_fields(res);
int nRows = 0;
char buf[4096];
if (blockFetch) {
nRows = taos_fetch_block(res, &row);
@ -25,6 +23,7 @@ void print_result(TAOS_RES* res, int blockFetch) {
//}
} else {
while ((row = taos_fetch_row(res))) {
char buf[4096] = {0};
taos_print_row(buf, row, fields, num_fields);
puts(buf);
nRows++;

View File

@ -21,7 +21,7 @@ def pre_test(){
cmake .. > /dev/null
make > /dev/null
make install > /dev/null
pip3 install ${WKC}/src/connector/python/linux/python3/
pip3 install ${WKC}/src/connector/python/ || echo 0
'''
return 1
}

View File

@ -23,6 +23,7 @@ class Node:
self.hostIP = hostIP
self.hostName = hostName
self.homeDir = homeDir
self.corePath = '/coredump'
self.conn = Connection("{}@{}".format(username, hostName), connect_kwargs={"password": "{}".format(password)})
def buildTaosd(self):
@ -127,21 +128,37 @@ class Node:
print("remove taosd error for node %d " % self.index)
logging.exception(e)
def detectCoredumpFile(self):
try:
result = self.conn.run("find /coredump -name 'core_*' ", hide=True)
output = result.stdout
print("output: %s" % output)
return output
except Exception as e:
print("find coredump file error on node %d " % self.index)
logging.exception(e)
class Nodes:
def __init__(self):
self.tdnodes = []
self.tdnodes.append(Node(0, 'root', '52.143.103.7', 'node1', 'a', '/root/'))
self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
self.tdnodes.append(Node(0, 'root', '192.168.17.194', 'taosdata', 'r', '/root/'))
# self.tdnodes.append(Node(1, 'root', '52.250.48.222', 'node2', 'a', '/root/'))
# self.tdnodes.append(Node(2, 'root', '51.141.167.23', 'node3', 'a', '/root/'))
# self.tdnodes.append(Node(3, 'root', '52.247.207.173', 'node4', 'a', '/root/'))
# self.tdnodes.append(Node(4, 'root', '51.141.166.100', 'node5', 'a', '/root/'))
def stopOneNode(self, index):
self.tdnodes[index].stopTaosd()
self.tdnodes[index].forceStopOneTaosd()
def startOneNode(self, index):
self.tdnodes[index].startOneTaosd()
def detectCoredumpFile(self, index):
return self.tdnodes[index].detectCoredumpFile()
def stopAllTaosd(self):
for i in range(len(self.tdnodes)):
self.tdnodes[i].stopTaosd()
@ -166,14 +183,32 @@ class Nodes:
for i in range(len(self.tdnodes)):
self.tdnodes[i].removeData()
class Test:
def __init__(self):
self.nodes = Nodes()
# kill taosd randomly every 10 mins
nodes = Nodes()
def randomlyKillDnode(self):
loop = 0
while True:
loop = loop + 1
index = random.randint(0, 4)
print("loop: %d, kill taosd on node%d" %(loop, index))
nodes.stopOneNode(index)
self.nodes.stopOneNode(index)
time.sleep(60)
nodes.startOneNode(index)
self.nodes.startOneNode(index)
time.sleep(600)
loop = loop + 1
def detectCoredump(self):
loop = 0
while True:
for i in range(len(self.nodes.tdnodes)):
result = self.nodes.detectCoredumpFile(i)
print("core file path is %s" % result)
if result and not result.isspace():
self.nodes.stopAllTaosd()
print("sleep for 10 mins")
time.sleep(600)
test = Test()
test.detectCoredump()

View File

@ -229,7 +229,7 @@ python3 ./test.py -f query/queryFilterTswithDateUnit.py
python3 ./test.py -f query/queryTscomputWithNow.py
python3 ./test.py -f query/computeErrorinWhere.py
python3 ./test.py -f query/queryTsisNull.py
python3 ./test.py -f query/subqueryFilter.py
#stream

View File

@ -25,7 +25,7 @@ class TDTestCase:
def run(self):
tdSql.query("show variables")
tdSql.checkData(51, 1, 864000)
tdSql.checkData(53, 1, 864000)
def stop(self):
tdSql.close()

View File

@ -141,7 +141,7 @@ class TDTestCase:
def run(self):
tdSql.prepare()
print("============== last_row_cache_0.sim")
print("============== Step1: last_row_cache_0.sim")
tdSql.execute("create database test1 cachelast 0")
tdSql.execute("use test1")
self.insertData()
@ -149,43 +149,48 @@ class TDTestCase:
self.insertData2()
self.executeQueries2()
print("============== alter last cache")
print("============== Step2: alter database test1 cachelast 1")
tdSql.execute("alter database test1 cachelast 1")
self.executeQueries2()
print("============== Step3: alter database test1 cachelast 2")
tdSql.execute("alter database test1 cachelast 2")
self.executeQueries2()
print("============== Step4: alter database test1 cachelast 3")
tdSql.execute("alter database test1 cachelast 3")
self.executeQueries2()
print("============== alter last cache")
print("============== Step5: alter database test1 cachelast 0 and restart taosd")
tdSql.execute("alter database test1 cachelast 0")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step6: alter database test1 cachelast 1 and restart taosd")
tdSql.execute("alter database test1 cachelast 1")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step7: alter database test1 cachelast 2 and restart taosd")
tdSql.execute("alter database test1 cachelast 2")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step8: alter database test1 cachelast 3 and restart taosd")
tdSql.execute("alter database test1 cachelast 3")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== last_row_cache_1.sim")
print("============== Step9: create database test2 cachelast 1")
tdSql.execute("create database test2 cachelast 1")
tdSql.execute("use test2")
self.insertData()
@ -196,42 +201,51 @@ class TDTestCase:
tdDnodes.start(1)
self.executeQueries2()
print("============== Step8: alter database test2 cachelast 0")
tdSql.execute("alter database test2 cachelast 0")
self.executeQueries2()
print("============== Step9: alter database test2 cachelast 1")
tdSql.execute("alter database test2 cachelast 1")
self.executeQueries2()
print("============== Step10: alter database test2 cachelast 2")
tdSql.execute("alter database test2 cachelast 2")
self.executeQueries2()
print("============== Step11: alter database test2 cachelast 3")
tdSql.execute("alter database test2 cachelast 3")
self.executeQueries2()
print("============== Step12: alter database test2 cachelast 0 and restart taosd")
tdSql.execute("alter database test2 cachelast 0")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step13: alter database test2 cachelast 1 and restart taosd")
tdSql.execute("alter database test2 cachelast 1")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step14: alter database test2 cachelast 2 and restart taosd")
tdSql.execute("alter database test2 cachelast 2")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step15: alter database test2 cachelast 3 and restart taosd")
tdSql.execute("alter database test2 cachelast 3")
self.executeQueries2()
tdDnodes.stop(1)
tdDnodes.start(1)
self.executeQueries2()
print("============== Step16: select last_row(*) from st group by tbname")
tdSql.query("select last_row(*) from st group by tbname")
tdSql.checkRows(10)

View File

@ -0,0 +1,126 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import *
from util.cases import *
from util.sql import *
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
print("==============step1")
tdSql.execute(
"create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50), tag2 binary(16))")
tdSql.execute(
'CREATE TABLE if not exists dev_001 using st tags("dev_01", "tag_01")')
tdSql.execute(
'CREATE TABLE if not exists dev_002 using st tags("dev_02", "tag_02")')
print("==============step2")
tdSql.execute(
"""INSERT INTO dev_001 VALUES('2020-05-13 10:00:00.000', 1)('2020-05-13 10:00:00.005', 2)('2020-05-13 10:00:00.011', 3)
('2020-05-13 10:00:01.011', 4)('2020-05-13 10:00:01.611', 5)('2020-05-13 10:00:02.612', 6)
('2020-05-13 10:01:02.612', 7)('2020-05-13 10:02:02.612', 8)('2020-05-13 10:03:02.613', 9)
('2020-05-13 11:00:00.000', 10)('2020-05-13 12:00:00.000', 11)('2020-05-13 13:00:00.001', 12)
('2020-05-14 13:00:00.001', 13)('2020-05-15 14:00:00.000', 14)('2020-05-20 10:00:00.000', 15)
('2020-05-27 10:00:00.001', 16) dev_002 VALUES('2020-05-13 10:00:00.000', 1)('2020-05-13 10:00:00.005', 2)('2020-05-13 10:00:00.009', 3)('2020-05-13 10:00:00.0021', 4)
('2020-05-13 10:00:00.031', 5)('2020-05-13 10:00:00.036', 6)('2020-05-13 10:00:00.51', 7)
""")
# session(ts,5a)
tdSql.query("select count(*) from dev_001 session(ts,5a)")
tdSql.checkRows(15)
tdSql.checkData(0, 1, 2)
# session(ts,1s)
tdSql.query("select count(*) from dev_001 session(ts,1s)")
tdSql.checkRows(12)
tdSql.checkData(0, 1, 5)
tdSql.query("select count(*) from dev_001 session(ts,1000a)")
tdSql.checkRows(12)
tdSql.checkData(0, 1, 5)
# session(ts,1m)
tdSql.query("select count(*) from dev_001 session(ts,1m)")
tdSql.checkRows(9)
tdSql.checkData(0, 1, 8)
# session(ts,1h)
tdSql.query("select count(*) from dev_001 session(ts,1h)")
tdSql.checkRows(6)
tdSql.checkData(0, 1, 11)
# session(ts,1d)
tdSql.query("select count(*) from dev_001 session(ts,1d)")
tdSql.checkRows(4)
tdSql.checkData(0, 1, 13)
# session(ts,1w)
tdSql.query("select count(*) from dev_001 session(ts,1w)")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 15)
# session with where
tdSql.query("select count(*),first(tagtype),last(tagtype),avg(tagtype),sum(tagtype),min(tagtype),max(tagtype),leastsquares(tagtype, 1, 1),spread(tagtype) from dev_001 where ts <'2020-05-20 0:0:0' session(ts,1d)")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 13)
tdSql.checkData(0, 2, 1)
tdSql.checkData(0, 3, 13)
tdSql.checkData(0, 4, 7)
tdSql.checkData(0, 5, 91)
tdSql.checkData(0, 6, 1)
tdSql.checkData(0, 7, 13)
tdSql.checkData(0, 8, '{slop:1.000000, intercept:0.000000}')
tdSql.checkData(0, 9, 12)
# tdsql err
tdSql.error("select * from dev_001 session(ts,1w)")
tdSql.error("select count(*) from st session(ts,1w)")
tdSql.error("select count(*) from dev_001 group by tagtype session(ts,1w) ")
tdSql.error("select count(*) from dev_001 session(ts,1n)")
tdSql.error("select count(*) from dev_001 session(ts,1y)")
tdSql.error("select count(*) from dev_001 session(ts,0s)")
tdSql.error("select count(*) from dev_001 session(i,1y)")
tdSql.error("select count(*) from dev_001 session(ts,1d) where ts <'2020-05-20 0:0:0'")
#test precision us
tdSql.execute("create database test precision 'us'")
tdSql.execute("use test")
tdSql.execute("create table dev_001 (ts timestamp ,i timestamp ,j int)")
tdSql.execute("insert into dev_001 values(1623046993681000,now,1)(1623046993681001,now+1s,2)(1623046993681002,now+2s,3)(1623046993681004,now+5s,4)")
# session(ts,1u)
tdSql.query("select count(*) from dev_001 session(ts,1u)")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 3)
tdSql.error("select count(*) from dev_001 session(i,1s)")
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,123 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import sys
import taos
from util.log import tdLog
from util.cases import tdCases
from util.sql import tdSql
from util.dnodes import tdDnodes
class TDTestCase:
def init(self, conn, logSql):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)
self.ts = 1601481600000
self.tables = 10
self.perfix = 'dev'
def insertData(self):
print("==============step1")
tdSql.execute(
"create table if not exists st (ts timestamp, tagtype int) tags(dev nchar(50))")
for i in range(self.tables):
tdSql.execute("create table %s%d using st tags(%d)" % (self.perfix, i, i))
rows = 15 + i
for j in range(rows):
tdSql.execute("insert into %s%d values(%d, %d)" %(self.perfix, i, self.ts + i * 20 * 10000 + j * 10000, j))
def run(self):
tdSql.prepare()
self.insertData()
tdSql.query("select count(*) val from st group by tbname")
tdSql.checkRows(10)
tdSql.query("select * from (select count(*) val from st group by tbname)")
tdSql.checkRows(10)
tdSql.query("select * from (select count(*) val from st group by tbname) a where a.val < 20")
tdSql.checkRows(5)
tdSql.query("select * from (select count(*) val from st group by tbname) a where a.val > 20")
tdSql.checkRows(4)
tdSql.query("select * from (select count(*) val from st group by tbname) a where a.val = 20")
tdSql.checkRows(1)
tdSql.query("select * from (select count(*) val from st group by tbname) a where a.val <= 20")
tdSql.checkRows(6)
tdSql.query("select * from (select count(*) val from st group by tbname) a where a.val >= 20")
tdSql.checkRows(5)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val > 20")
tdSql.checkRows(1)
tdSql.checkData(0, 0, 1)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val >= 20")
tdSql.checkData(0, 0, 2)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val < 20")
tdSql.checkData(0, 0, 63)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val <= 20")
tdSql.checkData(0, 0, 64)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val = 20")
tdSql.checkData(0, 0, 1)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val > 20")
tdSql.checkData(0, 0, 1)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val >= 20")
tdSql.checkData(0, 0, 2)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val < 20")
tdSql.checkData(0, 0, 63)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val <= 20")
tdSql.checkData(0, 0, 64)
tdSql.query("select count(*) from (select first(tagtype) val from st interval(30s)) a where a.val = 20")
tdSql.checkData(0, 0, 1)
tdSql.query("select count(*) from (select last(tagtype) val from st interval(30s)) a where a.val > 20")
tdSql.checkData(0, 0, 3)
tdSql.query("select count(*) from (select last(tagtype) val from st interval(30s)) a where a.val >= 20")
tdSql.checkData(0, 0, 5)
tdSql.query("select count(*) from (select last(tagtype) val from st interval(30s)) a where a.val < 20")
tdSql.checkData(0, 0, 60)
tdSql.query("select count(*) from (select last(tagtype) val from st interval(30s)) a where a.val <= 20")
tdSql.checkData(0, 0, 62)
tdSql.query("select count(*) from (select last(tagtype) val from st interval(30s)) a where a.val = 20")
tdSql.checkData(0, 0, 2)
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -177,7 +177,7 @@ sql alter database db blocks 20
sql alter database db blocks 10
sql_error alter database db blocks 2
sql_error alter database db blocks 1
sql alter database db blocks 0
sql_error alter database db blocks 0
sql_error alter database db blocks -1
sql_error alter database db blocks 10001

View File

@ -455,7 +455,7 @@ sql alter database db blocks 20
sql alter database db blocks 10
sql_error alter database db blocks 2
sql_error alter database db blocks 1
sql alter database db blocks 0
sql_error alter database db blocks 0
sql_error alter database db blocks -1
sql_error alter database db blocks 10001

View File

@ -932,3 +932,125 @@ if $data32 != 0.000144445 then
return -1
endi
print ===========================> derivative
sql drop table t1
sql drop table tx;
sql drop table m1;
sql drop table if exists tm0;
sql drop table if exists tm1;
sql create table tm0(ts timestamp, k double)
sql insert into tm0 values('2015-08-18T00:00:00Z', 2.064) ('2015-08-18T00:06:00Z', 2.116) ('2015-08-18T00:12:00Z', 2.028)
sql insert into tm0 values('2015-08-18T00:18:00Z', 2.126) ('2015-08-18T00:24:00Z', 2.041) ('2015-08-18T00:30:00Z', 2.051)
sql_error select derivative(ts) from tm0;
sql_error select derivative(k) from tm0;
sql_error select derivative(k, 0, 0) from tm0;
sql_error select derivative(k, 1, 911) from tm0;
sql_error select derivative(kx, 1s, 1) from tm0;
sql_error select derivative(k, -20s, 1) from tm0;
sql_error select derivative(k, 20a, 0) from tm0;
sql_error select derivative(k, 200a, 0) from tm0;
sql_error select derivative(k, 999a, 0) from tm0;
sql_error select derivative(k, 20s, -12) from tm0;
sql select derivative(k, 1s, 0) from tm0
if $rows != 5 then
return -1
endi
if $data00 != @15-08-18 08:06:00.000@ then
return -1
endi
if $data01 != 0.000144444 then
print expect 0.000144444, actual: $data01
return -1
endi
if $data10 != @15-08-18 08:12:00.000@ then
return -1
endi
if $data11 != -0.000244444 then
return -1
endi
if $data20 != @15-08-18 08:18:00.000@ then
return -1
endi
if $data21 != 0.000272222 then
print expect 0.000272222, actual: $data21
return -1
endi
if $data30 != @15-08-18 08:24:00.000@ then
return -1
endi
if $data31 != -0.000236111 then
print expect 0.000236111, actual: $data31
return -1
endi
sql select derivative(k, 6m, 0) from tm0;
if $rows != 5 then
return -1
endi
if $data00 != @15-08-18 08:06:00.000@ then
return -1
endi
if $data01 != 0.052000000 then
print expect 0.052000000, actual: $data01
return -1
endi
if $data10 != @15-08-18 08:12:00.000@ then
return -1
endi
if $data11 != -0.088000000 then
return -1
endi
if $data20 != @15-08-18 08:18:00.000@ then
return -1
endi
if $data21 != 0.098000000 then
return -1
endi
if $data30 != @15-08-18 08:24:00.000@ then
return -1
endi
if $data31 != -0.085000000 then
return -1
endi
sql select derivative(k, 12m, 0) from tm0;
if $rows != 5 then
return -1
endi
if $data00 != @15-08-18 08:06:00.000@ then
return -1
endi
if $data01 != 0.104000000 then
print expect 0.104000000, actual: $data01
return -1
endi
sql select derivative(k, 6m, 1) from tm0;
if $rows != 3 then
return -1
endi
sql_error select derivative(k, 6m, 1) from tm0 interval(1s);
sql_error select derivative(k, 6m, 1) from tm0 session(ts, 1s);
sql_error select derivative(k, 6m, 1) from tm0 group by k;
sql_error select derivative(k, 6m, 1) from

View File

@ -147,8 +147,57 @@ if $data02 != @nest_tb0@ then
endi
print ===================> nest query interval
sql_error select ts, avg(c1) from (select ts, c1 from nest_tb0);
sql select avg(c1) from (select * from nest_tb0) interval(3d)
if $rows != 3 then
return -1
endi
if $data00 != @20-09-14 00:00:00.000@ then
return -1
endi
if $data01 != 49.222222222 then
return -1
endi
if $data10 != @20-09-17 00:00:00.000@ then
print expect 20-09-17 00:00:00.000, actual: $data10
return -1
endi
if $data11 != 49.685185185 then
return -1
endi
if $data20 != @20-09-20 00:00:00.000@ then
return -1
endi
if $data21 != 49.500000000 then
return -1
endi
#define TSDB_FUNC_APERCT 7
#define TSDB_FUNC_LAST_ROW 10
#define TSDB_FUNC_TWA 14
#define TSDB_FUNC_LEASTSQR 15
#define TSDB_FUNC_ARITHM 23
#define TSDB_FUNC_DIFF 24
#define TSDB_FUNC_INTERP 28
#define TSDB_FUNC_RATE 29
#define TSDB_FUNC_IRATE 30
#define TSDB_FUNC_DERIVATIVE 32
sql_error select stddev(c1) from (select c1 from nest_tb0);
sql_error select percentile(c1, 20) from (select * from nest_tb0);
sql select avg(c1),sum(c2), max(c3), min(c4), count(*), first(c7), last(c7),spread(c6) from (select * from nest_tb0) interval(1d);
sql select top(x, 20) from (select c1 x from nest_tb0);
sql select bottom(x, 20) from (select c1 x from nest_tb0)
print ===================> complex query