Merge branch 'develop' into test/TD-4463
This commit is contained in:
commit
8119f72b58
|
@ -10,3 +10,6 @@
|
|||
[submodule "tests/examples/rust"]
|
||||
path = tests/examples/rust
|
||||
url = https://github.com/songtianyi/tdengine-rust-bindings.git
|
||||
[submodule "deps/jemalloc"]
|
||||
path = deps/jemalloc
|
||||
url = https://github.com/jemalloc/jemalloc
|
||||
|
|
|
@ -59,6 +59,11 @@ IF (TD_LINUX_64)
|
|||
MESSAGE(STATUS "linux64 is defined")
|
||||
SET(COMMON_FLAGS "-std=gnu99 -Wall -Werror -fPIC -gdwarf-2 -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE")
|
||||
ADD_DEFINITIONS(-DUSE_LIBICONV)
|
||||
|
||||
IF (JEMALLOC_ENABLED)
|
||||
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
|
||||
ENDIF ()
|
||||
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX_32)
|
||||
|
|
|
@ -72,3 +72,8 @@ IF (${RANDOM_NETWORK_FAIL} MATCHES "true")
|
|||
SET(TD_RANDOM_NETWORK_FAIL TRUE)
|
||||
MESSAGE(STATUS "build with random-network-fail enabled")
|
||||
ENDIF ()
|
||||
|
||||
IF (${JEMALLOC_ENABLED} MATCHES "true")
|
||||
SET(TD_JEMALLOC_ENABLED TRUE)
|
||||
MESSAGE(STATUS "build with jemalloc enabled")
|
||||
ENDIF ()
|
||||
|
|
|
@ -18,3 +18,16 @@ ENDIF ()
|
|||
IF (TD_DARWIN AND TD_MQTT)
|
||||
ADD_SUBDIRECTORY(MQTT-C)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
MESSAGE("setup dpes/jemalloc, current source dir:" ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
MESSAGE("binary dir:" ${CMAKE_BINARY_DIR})
|
||||
include(ExternalProject)
|
||||
ExternalProject_Add(jemalloc
|
||||
PREFIX "jemalloc"
|
||||
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
|
||||
BUILD_IN_SOURCE 1
|
||||
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
|
||||
BUILD_COMMAND ${MAKE}
|
||||
)
|
||||
ENDIF ()
|
||||
|
|
|
@ -0,0 +1 @@
|
|||
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756
|
|
@ -22,11 +22,12 @@ cpuType=x64 # [aarch32 | aarch64 | x64 | x86 | mips64 ...]
|
|||
osType=Linux # [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...]
|
||||
pagMode=full # [full | lite]
|
||||
soMode=dynamic # [static | dynamic]
|
||||
allocator=glibc # [glibc | jemalloc]
|
||||
dbName=taos # [taos | power]
|
||||
verNumber=""
|
||||
verNumberComp="2.0.0.0"
|
||||
|
||||
while getopts "hv:V:c:o:l:s:d:n:m:" arg
|
||||
while getopts "hv:V:c:o:l:s:d:a:n:m:" arg
|
||||
do
|
||||
case $arg in
|
||||
v)
|
||||
|
@ -53,6 +54,10 @@ do
|
|||
#echo "dbName=$OPTARG"
|
||||
dbName=$(echo $OPTARG)
|
||||
;;
|
||||
a)
|
||||
#echo "allocator=$OPTARG"
|
||||
allocator=$(echo $OPTARG)
|
||||
;;
|
||||
n)
|
||||
#echo "verNumber=$OPTARG"
|
||||
verNumber=$(echo $OPTARG)
|
||||
|
@ -71,6 +76,7 @@ do
|
|||
echo " -o [Linux | Kylin | Alpine | Raspberrypi | Darwin | Windows | Ningsi60 | Ningsi80 |...] "
|
||||
echo " -V [stable | beta] "
|
||||
echo " -l [full | lite] "
|
||||
echo " -a [glibc | jemalloc] "
|
||||
echo " -s [static | dynamic] "
|
||||
echo " -d [taos | power] "
|
||||
echo " -n [version number] "
|
||||
|
@ -84,7 +90,7 @@ do
|
|||
esac
|
||||
done
|
||||
|
||||
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} verNumber=${verNumber} verNumberComp=${verNumberComp}"
|
||||
echo "verMode=${verMode} verType=${verType} cpuType=${cpuType} osType=${osType} pagMode=${pagMode} soMode=${soMode} dbName=${dbName} allocator=${allocator} verNumber=${verNumber} verNumberComp=${verNumberComp}"
|
||||
|
||||
curr_dir=$(pwd)
|
||||
|
||||
|
@ -180,12 +186,18 @@ else
|
|||
fi
|
||||
cd ${compile_dir}
|
||||
|
||||
if [[ "$allocator" == "jemalloc" ]]; then
|
||||
allocator_macro="-DJEMALLOC_ENABLED=true"
|
||||
else
|
||||
allocator_macro=""
|
||||
fi
|
||||
|
||||
# check support cpu type
|
||||
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]] ; then
|
||||
if [ "$verMode" != "cluster" ]; then
|
||||
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode}
|
||||
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} ${allocator_macro}
|
||||
else
|
||||
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp}
|
||||
cmake ../../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} ${allocator_macro}
|
||||
fi
|
||||
else
|
||||
echo "input cpuType=${cpuType} error!!!"
|
||||
|
|
|
@ -6,7 +6,7 @@
|
|||
set -e
|
||||
# set -x
|
||||
|
||||
# -----------------------Variables definition---------------------
|
||||
# -----------------------Variables definition
|
||||
source_dir=$1
|
||||
binary_dir=$2
|
||||
osType=$3
|
||||
|
@ -176,6 +176,49 @@ function install_bin() {
|
|||
[ -x ${install_main_dir}/bin/remove_client.sh ] && ${csudo} ln -s ${install_main_dir}/bin/remove_client.sh ${bin_link_dir}/rmtaos || :
|
||||
fi
|
||||
}
|
||||
function install_jemalloc() {
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
/usr/bin/install -c -d /usr/local/bin
|
||||
|
||||
if [ -f ${binary_dir}/build/bin/jemalloc-config ]; then
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc-config /usr/local/bin
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/bin/jemalloc.sh ]; then
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jemalloc.sh /usr/local/bin
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/bin/jeprof ]; then
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/bin/jeprof /usr/local/bin
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/include/jemalloc/jemalloc.h ]; then
|
||||
/usr/bin/install -c -d /usr/local/include/jemalloc
|
||||
/usr/bin/install -c -m 644 ${binary_dir}/build/include/jemalloc/jemalloc.h /usr/local/include/jemalloc
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/lib/libjemalloc.so.2 ]; then
|
||||
/usr/bin/install -c -d /usr/local/lib
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
|
||||
ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
|
||||
/usr/bin/install -c -d /usr/local/lib
|
||||
if [ -f ${binary_dir}/build/lib/libjemalloc.a ]; then
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
|
||||
/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc_pic.a /usr/local/lib
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/lib/libjemalloc_pic.a ]; then
|
||||
/usr/bin/install -c -d /usr/local/lib/pkgconfig
|
||||
/usr/bin/install -c -m 644 ${binary_dir}/build/lib/pkgconfig/jemalloc.pc /usr/local/lib/pkgconfig
|
||||
fi
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/share/doc/jemalloc/jemalloc.html ]; then
|
||||
/usr/bin/install -c -d /usr/local/share/doc/jemalloc
|
||||
/usr/bin/install -c -m 644 ${binary_dir}/build/share/doc/jemalloc/jemalloc.html /usr/local/share/doc/jemalloc
|
||||
fi
|
||||
if [ -f ${binary_dir}/build/share/man/man3/jemalloc.3 ]; then
|
||||
/usr/bin/install -c -d /usr/local/share/man/man3
|
||||
/usr/bin/install -c -m 644 ${binary_dir}/build/share/man/man3/jemalloc.3 /usr/local/share/man/man3
|
||||
fi
|
||||
fi
|
||||
}
|
||||
|
||||
function install_lib() {
|
||||
# Remove links
|
||||
|
@ -199,6 +242,8 @@ function install_lib() {
|
|||
${csudo} ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib
|
||||
fi
|
||||
|
||||
install_jemalloc
|
||||
|
||||
if [ "$osType" != "Darwin" ]; then
|
||||
${csudo} ldconfig
|
||||
fi
|
||||
|
|
|
@ -123,6 +123,8 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
|
|||
*/
|
||||
bool tscIsPointInterpQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTWAQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo);
|
||||
bool tscIsSecondStageQuery(SQueryInfo* pQueryInfo);
|
||||
bool tsIsArithmeticQueryOnAggResult(SQueryInfo* pQueryInfo);
|
||||
bool tscGroupbyColumn(SQueryInfo* pQueryInfo);
|
||||
bool tscIsTopBotQuery(SQueryInfo* pQueryInfo);
|
||||
|
|
|
@ -90,6 +90,7 @@ static int32_t validateGroupbyNode(SQueryInfo* pQueryInfo, SArray* pList, SSqlCm
|
|||
static int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode);
|
||||
static int32_t parseIntervalOffset(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* offsetToken);
|
||||
static int32_t parseSlidingClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SStrToken* pSliding);
|
||||
static int32_t validateStateWindowNode(SSqlCmd* pSql, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable);
|
||||
|
||||
static int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, tSqlExprItem* pItem);
|
||||
|
||||
|
@ -851,6 +852,59 @@ int32_t validateIntervalNode(SSqlObj* pSql, SQueryInfo* pQueryInfo, SSqlNode* pS
|
|||
// The following part is used to check for the invalid query expression.
|
||||
return checkInvalidExprForTimeWindow(pCmd, pQueryInfo);
|
||||
}
|
||||
static int32_t validateStateWindowNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, bool isStable) {
|
||||
|
||||
const char* msg1 = "invalid column name";
|
||||
const char* msg3 = "not support state_window with group by ";
|
||||
const char* msg4 = "function not support for super table query";
|
||||
|
||||
SStrToken *col = &(pSqlNode->windowstateVal.col) ;
|
||||
if (col->z == NULL || col->n <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pQueryInfo->colList == NULL) {
|
||||
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
pQueryInfo->groupbyExpr.numOfGroupCols = 1;
|
||||
|
||||
//TODO(dengyihao): check tag column
|
||||
if (isStable) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
int32_t numOfCols = tscGetNumOfColumns(pTableMeta);
|
||||
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX || index.columnIndex >= numOfCols) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
SGroupbyExpr* pGroupExpr = &pQueryInfo->groupbyExpr;
|
||||
if (pGroupExpr->columnInfo == NULL) {
|
||||
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
|
||||
}
|
||||
|
||||
SSchema* pSchema = tscGetTableColumnSchema(pTableMeta, index.columnIndex);
|
||||
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || pSchema->type == TSDB_DATA_TYPE_FLOAT || pSchema->type == TSDB_DATA_TYPE_DOUBLE) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
|
||||
}
|
||||
|
||||
tscColumnListInsert(pQueryInfo->colList, index.columnIndex, pTableMeta->id.uid, pSchema);
|
||||
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
|
||||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
|
||||
pQueryInfo->stateWindow = true;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pSqlNode) {
|
||||
const char* msg1 = "gap should be fixed time window";
|
||||
|
@ -885,11 +939,17 @@ int32_t validateSessionNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode * pS
|
|||
if (pQueryInfo->sessionWindow.gap != 0 && pQueryInfo->interval.interval != 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
if (pQueryInfo->sessionWindow.gap == 0) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
|
||||
}
|
||||
|
||||
SColumnIndex index = COLUMN_INDEX_INITIALIZER;
|
||||
if (getColumnIndexByName(pCmd, col, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
|
||||
|
||||
|
@ -2896,6 +2956,9 @@ bool hasUnsupportFunctionsForSTableQuery(SSqlCmd* pCmd, SQueryInfo* pQueryInfo)
|
|||
return true;
|
||||
}
|
||||
}
|
||||
} else if (tscIsSessionWindowQuery(pQueryInfo)) {
|
||||
invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
|
@ -6156,7 +6219,7 @@ static int32_t doTagFunctionCheck(SQueryInfo* pQueryInfo) {
|
|||
int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
||||
const char* msg1 = "functions/columns not allowed in group by query";
|
||||
const char* msg2 = "projection query on columns not allowed";
|
||||
const char* msg3 = "group by not allowed on projection query";
|
||||
const char* msg3 = "group by/session/state_window not allowed on projection query";
|
||||
const char* msg4 = "retrieve tags not compatible with group by or interval query";
|
||||
const char* msg5 = "functions can not be mixed up";
|
||||
|
||||
|
@ -6172,6 +6235,9 @@ int32_t doFunctionsCompatibleCheck(SSqlCmd* pCmd, SQueryInfo* pQueryInfo) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
if (tscIsProjectionQuery(pQueryInfo) && tscIsSessionWindowQuery(pQueryInfo)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
|
||||
}
|
||||
|
||||
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
|
||||
// check if all the tags prj columns belongs to the group by columns
|
||||
|
@ -6741,6 +6807,7 @@ int32_t doCheckForStream(SSqlObj* pSql, SSqlInfo* pInfo) {
|
|||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
|
||||
if (isTimeWindowQuery(pQueryInfo) && (validateFunctionsInIntervalOrGroupbyQuery(pCmd, pQueryInfo) != TSDB_CODE_SUCCESS)) {
|
||||
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
|
||||
}
|
||||
|
@ -7529,7 +7596,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
// parse the window_state
|
||||
if (validateStateWindowNode(pCmd, pQueryInfo, pSqlNode, isSTable) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
// set order by info
|
||||
if (validateOrderbyNode(pCmd, pQueryInfo, pSqlNode, tscGetTableSchema(pTableMetaInfo->pTableMeta)) !=
|
||||
TSDB_CODE_SUCCESS) {
|
||||
|
@ -7570,6 +7640,10 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
* transfer sql functions that need secondary merge into another format
|
||||
* in dealing with super table queries such as: count/first/last
|
||||
*/
|
||||
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
if (isSTable) {
|
||||
tscTansformFuncForSTableQuery(pQueryInfo);
|
||||
if (hasUnsupportFunctionsForSTableQuery(pCmd, pQueryInfo)) {
|
||||
|
@ -7577,10 +7651,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
|
|||
}
|
||||
}
|
||||
|
||||
if (validateSessionNode(pCmd, pQueryInfo, pSqlNode) != TSDB_CODE_SUCCESS) {
|
||||
return TSDB_CODE_TSC_INVALID_OPERATION;
|
||||
}
|
||||
|
||||
// no result due to invalid query time range
|
||||
if (pQueryInfo->window.skey > pQueryInfo->window.ekey) {
|
||||
pQueryInfo->command = TSDB_SQL_RETRIEVE_EMPTY_RESULT;
|
||||
|
|
|
@ -857,6 +857,7 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
pQueryMsg->simpleAgg = query.simpleAgg;
|
||||
pQueryMsg->pointInterpQuery = query.pointInterpQuery;
|
||||
pQueryMsg->needReverseScan = query.needReverseScan;
|
||||
pQueryMsg->stateWindow = query.stateWindow;
|
||||
|
||||
pQueryMsg->numOfTags = htonl(numOfTags);
|
||||
pQueryMsg->sqlstrLen = htonl(sqlLen);
|
||||
|
|
|
@ -434,6 +434,9 @@ bool tscIsTWAQuery(SQueryInfo* pQueryInfo) {
|
|||
|
||||
return false;
|
||||
}
|
||||
bool tscIsSessionWindowQuery(SQueryInfo* pQueryInfo) {
|
||||
return pQueryInfo->sessionWindow.gap > 0;
|
||||
}
|
||||
|
||||
bool tscNeedReverseScan(SQueryInfo* pQueryInfo) {
|
||||
size_t numOfExprs = tscNumOfExprs(pQueryInfo);
|
||||
|
@ -4202,11 +4205,13 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->simpleAgg = isSimpleAggregate(pQueryInfo);
|
||||
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
|
||||
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
|
||||
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
|
||||
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
|
||||
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
|
||||
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
|
||||
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
|
||||
pQueryAttr->distinctTag = pQueryInfo->distinctTag;
|
||||
pQueryAttr->sw = pQueryInfo->sessionWindow;
|
||||
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
|
||||
|
||||
pQueryAttr->numOfCols = numOfCols;
|
||||
pQueryAttr->numOfOutput = numOfOutput;
|
||||
|
@ -4214,9 +4219,9 @@ int32_t tscCreateQueryFromQueryInfo(SQueryInfo* pQueryInfo, SQueryAttr* pQueryAt
|
|||
pQueryAttr->slimit = pQueryInfo->slimit;
|
||||
pQueryAttr->order = pQueryInfo->order;
|
||||
pQueryAttr->fillType = pQueryInfo->fillType;
|
||||
pQueryAttr->groupbyColumn = tscGroupbyColumn(pQueryInfo);
|
||||
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
|
||||
|
||||
|
||||
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
|
||||
pQueryAttr->window = pQueryInfo->window;
|
||||
} else {
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 050667e5b4d0eafa5387e4283e713559b421203f
|
||||
Subproject commit 8ce6d86558afc8c0b50c10f990fd2b4270cf06fc
|
|
@ -1 +1 @@
|
|||
Subproject commit 32e2c97a4cf7bedaa99f5d6dd8cb036e7f4470df
|
||||
Subproject commit 3530c6df097134a410bacec6b3cd013ef38a61aa
|
|
@ -10,8 +10,15 @@ INCLUDE_DIRECTORIES(${TD_ENTERPRISE_DIR}/src/inc)
|
|||
INCLUDE_DIRECTORIES(inc)
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
|
||||
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
|
||||
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
|
||||
ELSE ()
|
||||
SET(LINK_JEMALLOC "")
|
||||
ENDIF ()
|
||||
|
||||
ADD_EXECUTABLE(taosd ${SRC})
|
||||
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync)
|
||||
TARGET_LINK_LIBRARIES(taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync ${LINK_JEMALLOC})
|
||||
|
||||
IF (TD_SOMODE_STATIC)
|
||||
TARGET_LINK_LIBRARIES(taosd taos_static)
|
||||
|
|
|
@ -473,6 +473,7 @@ typedef struct {
|
|||
bool simpleAgg;
|
||||
bool pointInterpQuery; // point interpolation query
|
||||
bool needReverseScan; // need reverse scan
|
||||
bool stateWindow; // state window flag
|
||||
|
||||
STimeWindow window;
|
||||
int32_t numOfTables;
|
||||
|
|
|
@ -136,74 +136,74 @@
|
|||
#define TK_VARIABLE 117
|
||||
#define TK_INTERVAL 118
|
||||
#define TK_SESSION 119
|
||||
#define TK_FILL 120
|
||||
#define TK_SLIDING 121
|
||||
#define TK_ORDER 122
|
||||
#define TK_BY 123
|
||||
#define TK_ASC 124
|
||||
#define TK_DESC 125
|
||||
#define TK_GROUP 126
|
||||
#define TK_HAVING 127
|
||||
#define TK_LIMIT 128
|
||||
#define TK_OFFSET 129
|
||||
#define TK_SLIMIT 130
|
||||
#define TK_SOFFSET 131
|
||||
#define TK_WHERE 132
|
||||
#define TK_NOW 133
|
||||
#define TK_RESET 134
|
||||
#define TK_QUERY 135
|
||||
#define TK_SYNCDB 136
|
||||
#define TK_ADD 137
|
||||
#define TK_COLUMN 138
|
||||
#define TK_TAG 139
|
||||
#define TK_CHANGE 140
|
||||
#define TK_SET 141
|
||||
#define TK_KILL 142
|
||||
#define TK_CONNECTION 143
|
||||
#define TK_STREAM 144
|
||||
#define TK_COLON 145
|
||||
#define TK_ABORT 146
|
||||
#define TK_AFTER 147
|
||||
#define TK_ATTACH 148
|
||||
#define TK_BEFORE 149
|
||||
#define TK_BEGIN 150
|
||||
#define TK_CASCADE 151
|
||||
#define TK_CLUSTER 152
|
||||
#define TK_CONFLICT 153
|
||||
#define TK_COPY 154
|
||||
#define TK_DEFERRED 155
|
||||
#define TK_DELIMITERS 156
|
||||
#define TK_DETACH 157
|
||||
#define TK_EACH 158
|
||||
#define TK_END 159
|
||||
#define TK_EXPLAIN 160
|
||||
#define TK_FAIL 161
|
||||
#define TK_FOR 162
|
||||
#define TK_IGNORE 163
|
||||
#define TK_IMMEDIATE 164
|
||||
#define TK_INITIALLY 165
|
||||
#define TK_INSTEAD 166
|
||||
#define TK_MATCH 167
|
||||
#define TK_KEY 168
|
||||
#define TK_OF 169
|
||||
#define TK_RAISE 170
|
||||
#define TK_REPLACE 171
|
||||
#define TK_RESTRICT 172
|
||||
#define TK_ROW 173
|
||||
#define TK_STATEMENT 174
|
||||
#define TK_TRIGGER 175
|
||||
#define TK_VIEW 176
|
||||
#define TK_SEMI 177
|
||||
#define TK_NONE 178
|
||||
#define TK_PREV 179
|
||||
#define TK_LINEAR 180
|
||||
#define TK_IMPORT 181
|
||||
#define TK_TBNAME 182
|
||||
#define TK_JOIN 183
|
||||
#define TK_INSERT 184
|
||||
#define TK_INTO 185
|
||||
#define TK_VALUES 186
|
||||
|
||||
#define TK_STATE_WINDOW 120
|
||||
#define TK_FILL 121
|
||||
#define TK_SLIDING 122
|
||||
#define TK_ORDER 123
|
||||
#define TK_BY 124
|
||||
#define TK_ASC 125
|
||||
#define TK_DESC 126
|
||||
#define TK_GROUP 127
|
||||
#define TK_HAVING 128
|
||||
#define TK_LIMIT 129
|
||||
#define TK_OFFSET 130
|
||||
#define TK_SLIMIT 131
|
||||
#define TK_SOFFSET 132
|
||||
#define TK_WHERE 133
|
||||
#define TK_NOW 134
|
||||
#define TK_RESET 135
|
||||
#define TK_QUERY 136
|
||||
#define TK_SYNCDB 137
|
||||
#define TK_ADD 138
|
||||
#define TK_COLUMN 139
|
||||
#define TK_TAG 140
|
||||
#define TK_CHANGE 141
|
||||
#define TK_SET 142
|
||||
#define TK_KILL 143
|
||||
#define TK_CONNECTION 144
|
||||
#define TK_STREAM 145
|
||||
#define TK_COLON 146
|
||||
#define TK_ABORT 147
|
||||
#define TK_AFTER 148
|
||||
#define TK_ATTACH 149
|
||||
#define TK_BEFORE 150
|
||||
#define TK_BEGIN 151
|
||||
#define TK_CASCADE 152
|
||||
#define TK_CLUSTER 153
|
||||
#define TK_CONFLICT 154
|
||||
#define TK_COPY 155
|
||||
#define TK_DEFERRED 156
|
||||
#define TK_DELIMITERS 157
|
||||
#define TK_DETACH 158
|
||||
#define TK_EACH 159
|
||||
#define TK_END 160
|
||||
#define TK_EXPLAIN 161
|
||||
#define TK_FAIL 162
|
||||
#define TK_FOR 163
|
||||
#define TK_IGNORE 164
|
||||
#define TK_IMMEDIATE 165
|
||||
#define TK_INITIALLY 166
|
||||
#define TK_INSTEAD 167
|
||||
#define TK_MATCH 168
|
||||
#define TK_KEY 169
|
||||
#define TK_OF 170
|
||||
#define TK_RAISE 171
|
||||
#define TK_REPLACE 172
|
||||
#define TK_RESTRICT 173
|
||||
#define TK_ROW 174
|
||||
#define TK_STATEMENT 175
|
||||
#define TK_TRIGGER 176
|
||||
#define TK_VIEW 177
|
||||
#define TK_SEMI 178
|
||||
#define TK_NONE 179
|
||||
#define TK_PREV 180
|
||||
#define TK_LINEAR 181
|
||||
#define TK_IMPORT 182
|
||||
#define TK_TBNAME 183
|
||||
#define TK_JOIN 184
|
||||
#define TK_INSERT 185
|
||||
#define TK_INTO 186
|
||||
#define TK_VALUES 187
|
||||
|
||||
#define TK_SPACE 300
|
||||
#define TK_COMMENT 301
|
||||
|
|
|
@ -11,10 +11,17 @@ IF (TD_LINUX)
|
|||
LIST(REMOVE_ITEM SRC ./src/shellDarwin.c)
|
||||
ADD_EXECUTABLE(shell ${SRC})
|
||||
|
||||
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
|
||||
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
|
||||
ELSE ()
|
||||
SET(LINK_JEMALLOC "")
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_SOMODE_STATIC)
|
||||
TARGET_LINK_LIBRARIES(shell taos_static)
|
||||
TARGET_LINK_LIBRARIES(shell taos_static ${LINK_JEMALLOC})
|
||||
ELSE ()
|
||||
TARGET_LINK_LIBRARIES(shell taos)
|
||||
TARGET_LINK_LIBRARIES(shell taos ${LINK_JEMALLOC})
|
||||
ENDIF ()
|
||||
|
||||
SET_TARGET_PROPERTIES(shell PROPERTIES OUTPUT_NAME taos)
|
||||
|
|
|
@ -55,14 +55,21 @@ ENDIF ()
|
|||
MESSAGE("TD_VERSION_NUMBER is:" ${TD_VERSION_NUMBER})
|
||||
ADD_DEFINITIONS(-DTD_VERNUMBER="${TD_VERSION_NUMBER}")
|
||||
|
||||
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
|
||||
ADD_DEFINITIONS(-DTD_JEMALLOC_ENABLED -I${CMAKE_BINARY_DIR}/build/include -L${CMAKE_BINARY_DIR}/build/lib -Wl,-rpath,${CMAKE_BINARY_DIR}/build/lib -ljemalloc)
|
||||
SET(LINK_JEMALLOC "-L${CMAKE_BINARY_DIR}/build/lib -ljemalloc")
|
||||
ELSE ()
|
||||
SET(LINK_JEMALLOC "")
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
ADD_EXECUTABLE(taosdemo ${SRC})
|
||||
|
||||
IF (TD_SOMODE_STATIC)
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson)
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson ${LINK_JEMALLOC})
|
||||
ELSE ()
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos cJson)
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos cJson ${LINK_JEMALLOC})
|
||||
ENDIF ()
|
||||
ELSEIF (TD_WINDOWS)
|
||||
AUX_SOURCE_DIRECTORY(. SRC)
|
||||
|
@ -71,7 +78,7 @@ ELSEIF (TD_WINDOWS)
|
|||
IF (TD_SOMODE_STATIC)
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos_static cJson)
|
||||
ELSE ()
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos cJson})
|
||||
TARGET_LINK_LIBRARIES(taosdemo taos cJson)
|
||||
ENDIF ()
|
||||
ELSEIF (TD_DARWIN)
|
||||
# missing a few dependencies, such as <argp.h>
|
||||
|
|
|
@ -22,6 +22,10 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#ifdef TD_JEMALLOC_ENABLED
|
||||
#include <jemalloc/jemalloc.h>
|
||||
#endif
|
||||
|
||||
typedef enum {
|
||||
TAOS_ALLOC_MODE_DEFAULT = 0,
|
||||
TAOS_ALLOC_MODE_RANDOM_FAIL = 1,
|
||||
|
|
|
@ -189,6 +189,7 @@ typedef struct SQueryAttr {
|
|||
bool pointInterpQuery; // point interpolation query
|
||||
bool needReverseScan; // need reverse scan
|
||||
bool distinctTag; // distinct tag query
|
||||
bool stateWindow; // window State on sub/normal table
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
|
||||
int32_t havingNum; // having expr number
|
||||
|
@ -296,6 +297,7 @@ enum OPERATOR_TYPE_E {
|
|||
OP_Filter = 19,
|
||||
OP_Distinct = 20,
|
||||
OP_Join = 21,
|
||||
OP_StateWindow = 22,
|
||||
};
|
||||
|
||||
typedef struct SOperatorInfo {
|
||||
|
@ -460,6 +462,16 @@ typedef struct SSWindowOperatorInfo {
|
|||
int32_t start; // start row index
|
||||
} SSWindowOperatorInfo;
|
||||
|
||||
typedef struct SStateWindowOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
STimeWindow curWindow; // current time window
|
||||
int32_t numOfRows; // number of rows
|
||||
int32_t colIndex; // start row index
|
||||
int32_t start;
|
||||
char* prevData; // previous data
|
||||
|
||||
} SStateWindowOperatorInfo ;
|
||||
|
||||
typedef struct SDistinctOperatorInfo {
|
||||
SHashObj *pSet;
|
||||
SSDataBlock *pRes;
|
||||
|
@ -509,6 +521,7 @@ SOperatorInfo* createDistinctOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperat
|
|||
SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv);
|
||||
SOperatorInfo* createMultiwaySortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput,
|
||||
int32_t numOfRows, void* merger, bool groupMix);
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput);
|
||||
SOperatorInfo* createGlobalAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* param);
|
||||
SOperatorInfo* createSLimitOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger);
|
||||
SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr,
|
||||
|
|
|
@ -89,6 +89,10 @@ typedef struct SSessionWindowVal {
|
|||
SStrToken gap;
|
||||
} SSessionWindowVal;
|
||||
|
||||
typedef struct SWindowStateVal {
|
||||
SStrToken col;
|
||||
} SWindowStateVal;
|
||||
|
||||
struct SRelationInfo;
|
||||
|
||||
typedef struct SSqlNode {
|
||||
|
@ -100,6 +104,7 @@ typedef struct SSqlNode {
|
|||
SArray *fillType; // fill type[optional], SArray<tVariantListItem>
|
||||
SIntervalVal interval; // (interval, interval_offset) [optional]
|
||||
SSessionWindowVal sessionVal; // session window [optional]
|
||||
SWindowStateVal windowstateVal; // window_state(col) [optional]
|
||||
SStrToken sliding; // sliding window [optional]
|
||||
SLimitVal limit; // limit offset [optional]
|
||||
SLimitVal slimit; // group limit offset [optional]
|
||||
|
@ -275,7 +280,7 @@ SArray *tSqlExprListAppend(SArray *pList, tSqlExpr *pNode, SStrToken *pDistinc
|
|||
void tSqlExprListDestroy(SArray *pList);
|
||||
|
||||
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
|
||||
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps,
|
||||
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *ps, SWindowStateVal *pw,
|
||||
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *pgLimit, tSqlExpr *pHaving);
|
||||
int32_t tSqlExprCompare(tSqlExpr *left, tSqlExpr *right);
|
||||
|
||||
|
|
|
@ -138,6 +138,7 @@ typedef struct SQueryInfo {
|
|||
bool hasFilter;
|
||||
bool onlyTagQuery;
|
||||
bool orderProjectQuery;
|
||||
bool stateWindow;
|
||||
} SQueryInfo;
|
||||
|
||||
/**
|
||||
|
|
|
@ -456,8 +456,8 @@ tagitem(A) ::= PLUS(X) FLOAT(Y). {
|
|||
//////////////////////// The SELECT statement /////////////////////////////////
|
||||
%type select {SSqlNode*}
|
||||
%destructor select {destroySqlNode($$);}
|
||||
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
|
||||
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &S, F, &L, &G, N);
|
||||
select(A) ::= SELECT(T) selcollist(W) from(X) where_opt(Y) interval_opt(K) session_option(H) windowstate_option(D) fill_opt(F) sliding_opt(S) groupby_opt(P) orderby_opt(Z) having_opt(N) slimit_opt(G) limit_opt(L). {
|
||||
A = tSetQuerySqlNode(&T, W, X, Y, P, Z, &K, &H, &D, &S, F, &L, &G, N);
|
||||
}
|
||||
|
||||
select(A) ::= LP select(B) RP. {A = B;}
|
||||
|
@ -475,7 +475,7 @@ cmd ::= union(X). { setSqlInfo(pInfo, X, NULL, TSDB_SQL_SELECT); }
|
|||
// select client_version()
|
||||
// select server_state()
|
||||
select(A) ::= SELECT(T) selcollist(W). {
|
||||
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
A = tSetQuerySqlNode(&T, W, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL, NULL);
|
||||
}
|
||||
|
||||
// selcollist is a list of expressions that are to become the return
|
||||
|
@ -558,6 +558,11 @@ session_option(X) ::= SESSION LP ids(V) cpxName(Z) COMMA tmvar(Y) RP. {
|
|||
X.col = V;
|
||||
X.gap = Y;
|
||||
}
|
||||
%type windowstate_option {SWindowStateVal}
|
||||
windowstate_option(X) ::= . {X.col.n = 0;}
|
||||
windowstate_option(X) ::= STATE_WINDOW LP ids(V) RP. {
|
||||
X.col = V;
|
||||
}
|
||||
|
||||
%type fill_opt {SArray*}
|
||||
%destructor fill_opt {taosArrayDestroy($$);}
|
||||
|
|
|
@ -3299,8 +3299,12 @@ static void col_project_function(SQLFunctionCtx *pCtx) {
|
|||
if (pCtx->numOfParams == 2) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pCtx->param[0].i64 == 1) {
|
||||
SET_VAL(pCtx, pCtx->size, 1);
|
||||
} else {
|
||||
INC_INIT_VAL(pCtx, pCtx->size);
|
||||
}
|
||||
|
||||
|
||||
char *pData = GET_INPUT_DATA_LIST(pCtx);
|
||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
||||
|
|
|
@ -189,12 +189,16 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
|
|||
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyArithOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
|
||||
static void destroyOperatorInfo(SOperatorInfo* pOperator);
|
||||
|
||||
|
||||
static int32_t doCopyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock);
|
||||
|
||||
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
|
||||
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
|
||||
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
|
||||
|
||||
static void initCtxOutputBuffer(SQLFunctionCtx* pCtx, int32_t size);
|
||||
static void getAlignQueryTimeWindow(SQueryAttr *pQueryAttr, int64_t key, int64_t keyFirst, int64_t keyLast, STimeWindow *win);
|
||||
|
@ -731,7 +735,6 @@ static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
|
|||
if (pCtx[k].preAggVals.isSet && forwardStep < numOfTotal) {
|
||||
pCtx[k].preAggVals.isSet = false;
|
||||
}
|
||||
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||
aAggs[functionId].xFunction(&pCtx[k]);
|
||||
}
|
||||
|
@ -1299,7 +1302,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
|||
}
|
||||
|
||||
int32_t ret =
|
||||
setGroupResultOutputBuf(pRuntimeEnv, pInfo, pOperator->numOfOutput, val, type, bytes, item->groupIndex);
|
||||
setGroupResultOutputBuf(pRuntimeEnv, &(pInfo->binfo), pOperator->numOfOutput, val, type, bytes, item->groupIndex);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
@ -1338,12 +1341,16 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
pInfo->start = j;
|
||||
} else if (tsList[j] - pInfo->prevTs <= gap) {
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
pInfo->prevTs = tsList[j];
|
||||
//pInfo->prevTs = tsList[j];
|
||||
pInfo->numOfRows += 1;
|
||||
pInfo->start = j;
|
||||
if (j == 0 && pInfo->start != 0) {
|
||||
pInfo->numOfRows = 1;
|
||||
pInfo->start = 0;
|
||||
}
|
||||
} else { // start a new session window
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
@ -1364,6 +1371,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
@ -1391,12 +1399,12 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SGroupbyOperatorInfo *pInfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
|
||||
static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) {
|
||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||
|
||||
int32_t *rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
SResultRowInfo *pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
SQLFunctionCtx *pCtx = pInfo->binfo.pCtx;
|
||||
int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset;
|
||||
SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo;
|
||||
SQLFunctionCtx *pCtx = binfo->pCtx;
|
||||
|
||||
// not assign result buffer yet, add new result buffer, TODO remove it
|
||||
char* d = pData;
|
||||
|
@ -1767,6 +1775,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|||
}
|
||||
break;
|
||||
}
|
||||
case OP_StateWindow: {
|
||||
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
||||
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
||||
break;
|
||||
}
|
||||
|
||||
case OP_Limit: {
|
||||
pRuntimeEnv->proot = createLimitOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot);
|
||||
|
@ -2109,6 +2122,8 @@ static bool onlyFirstQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQu
|
|||
|
||||
static bool onlyLastQuery(SQueryAttr *pQueryAttr) { return onlyOneQueryType(pQueryAttr, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
|
||||
|
||||
static bool notContainSessionOrStateWindow(SQueryAttr *pQueryAttr) { return !(pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow); }
|
||||
|
||||
static int32_t updateBlockLoadStatus(SQueryAttr *pQuery, int32_t status) {
|
||||
bool hasFirstLastFunc = false;
|
||||
bool hasOtherFunc = false;
|
||||
|
@ -2212,7 +2227,7 @@ static void changeExecuteScanOrder(SQInfo *pQInfo, SQueryTableMsg* pQueryMsg, bo
|
|||
}
|
||||
|
||||
pQueryAttr->order.order = TSDB_ORDER_ASC;
|
||||
} else if (onlyLastQuery(pQueryAttr)) {
|
||||
} else if (onlyLastQuery(pQueryAttr) && notContainSessionOrStateWindow(pQueryAttr)) {
|
||||
if (QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
||||
qDebug(msg, pQInfo, "only-last", pQueryAttr->order.order, TSDB_ORDER_DESC, pQueryAttr->window.skey,
|
||||
pQueryAttr->window.ekey, pQueryAttr->window.ekey, pQueryAttr->window.skey);
|
||||
|
@ -3204,7 +3219,7 @@ void finalizeQueryResult(SOperatorInfo* pOperator, SQLFunctionCtx* pCtx, SResult
|
|||
SQueryAttr *pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
int32_t numOfOutput = pOperator->numOfOutput;
|
||||
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0) {
|
||||
if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
|
||||
// for each group result, call the finalize function for each column
|
||||
if (pQueryAttr->groupbyColumn) {
|
||||
closeAllResultRows(pResultRowInfo);
|
||||
|
@ -4514,6 +4529,12 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf
|
|||
} else if (pDownstream->operatorType == OP_SessionWindow) {
|
||||
SSWindowOperatorInfo* pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
} else if (pDownstream->operatorType == OP_StateWindow) {
|
||||
SStateWindowOperatorInfo* pInfo = pDownstream->info;
|
||||
|
||||
pTableScanInfo->pCtx = pInfo->binfo.pCtx;
|
||||
pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo;
|
||||
pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset;
|
||||
|
@ -4625,7 +4646,6 @@ static void destroyGlobalAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
tfree(pInfo->prevRow);
|
||||
tfree(pInfo->currentGroupColData);
|
||||
}
|
||||
|
||||
static void destroySlimitOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSLimitOperatorInfo *pInfo = (SSLimitOperatorInfo*) param;
|
||||
taosArrayDestroy(pInfo->orderColumnList);
|
||||
|
@ -5131,13 +5151,83 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
||||
|
||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
STableQueryInfo* item = pRuntimeEnv->current;
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pSDataBlock->pDataBlock, pInfo->colIndex);
|
||||
|
||||
SOptrBasicInfo* pBInfo = &pInfo->binfo;
|
||||
|
||||
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
||||
int16_t bytes = pColInfoData->info.bytes;
|
||||
int16_t type = pColInfoData->info.type;
|
||||
|
||||
SColumnInfoData* pTsColInfoData = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
TSKEY* tsList = (TSKEY*)pTsColInfoData->pData;
|
||||
|
||||
pInfo->numOfRows = 0;
|
||||
for (int32_t j = 0; j < pSDataBlock->info.rows; ++j) {
|
||||
char* val = ((char*)pColInfoData->pData) + bytes * j;
|
||||
if (isNull(val, type)) {
|
||||
continue;
|
||||
}
|
||||
if (pInfo->prevData == NULL) {
|
||||
pInfo->prevData = malloc(bytes);
|
||||
memcpy(pInfo->prevData, val, bytes);
|
||||
pInfo->numOfRows = 1;
|
||||
pInfo->curWindow.skey = tsList[j];
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
pInfo->start = j;
|
||||
|
||||
} else if (memcmp(pInfo->prevData, val, bytes) == 0) {
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
pInfo->numOfRows += 1;
|
||||
//pInfo->start = j;
|
||||
if (j == 0 && pInfo->start != 0) {
|
||||
pInfo->numOfRows = 1;
|
||||
pInfo->start = 0;
|
||||
}
|
||||
} else {
|
||||
SResultRow* pResult = NULL;
|
||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
|
||||
pSDataBlock->info.rows, pOperator->numOfOutput);
|
||||
|
||||
pInfo->curWindow.skey = tsList[j];
|
||||
pInfo->curWindow.ekey = tsList[j];
|
||||
memcpy(pInfo->prevData, val, bytes);
|
||||
pInfo->numOfRows = 1;
|
||||
pInfo->start = j;
|
||||
|
||||
}
|
||||
}
|
||||
SResultRow* pResult = NULL;
|
||||
|
||||
pInfo->curWindow.ekey = pInfo->curWindow.skey;
|
||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, &pBInfo->resultRowInfo, &pInfo->curWindow, masterScan,
|
||||
&pResult, item->groupIndex, pBInfo->pCtx, pOperator->numOfOutput,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
doApplyFunctions(pRuntimeEnv, pBInfo->pCtx, &pInfo->curWindow, pInfo->start, pInfo->numOfRows, tsList,
|
||||
pSDataBlock->info.rows, pOperator->numOfOutput);
|
||||
}
|
||||
static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
|
||||
SStateWindowOperatorInfo* pWindowInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
@ -5154,6 +5244,62 @@ static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
|||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
int32_t order = pQueryAttr->order.order;
|
||||
STimeWindow win = pQueryAttr->window;
|
||||
SOperatorInfo* upstream = pOperator->upstream[0];
|
||||
while (1) {
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order);
|
||||
if (pWindowInfo->colIndex == -1) {
|
||||
pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock);
|
||||
}
|
||||
doStateWindowAggImpl(pOperator, pWindowInfo, pBlock);
|
||||
}
|
||||
|
||||
// restore the value
|
||||
pQueryAttr->order.order = order;
|
||||
pQueryAttr->window = win;
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
|
||||
}
|
||||
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SSWindowOperatorInfo* pWindowInfo = pOperator->info;
|
||||
SOptrBasicInfo* pBInfo = &pWindowInfo->binfo;
|
||||
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pBInfo->pRes;
|
||||
}
|
||||
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
//pQueryAttr->order.order = TSDB_ORDER_ASC;
|
||||
int32_t order = pQueryAttr->order.order;
|
||||
STimeWindow win = pQueryAttr->window;
|
||||
|
||||
SOperatorInfo* upstream = pOperator->upstream[0];
|
||||
|
||||
|
@ -5389,7 +5535,7 @@ SOperatorInfo* createAggregateOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpera
|
|||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
pOperator->exec = doAggregate;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
pOperator->cleanup = destroyAggOperatorInfo;
|
||||
appendUpstream(pOperator, upstream);
|
||||
|
||||
return pOperator;
|
||||
|
@ -5409,6 +5555,19 @@ static void destroyBasicOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SOptrBasicInfo* pInfo = (SOptrBasicInfo*) param;
|
||||
doDestroyBasicInfo(pInfo, numOfOutput);
|
||||
}
|
||||
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SStateWindowOperatorInfo* pInfo = (SStateWindowOperatorInfo*) param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
tfree(pInfo->prevData);
|
||||
}
|
||||
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SAggOperatorInfo* pInfo = (SAggOperatorInfo*) param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
}
|
||||
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param;
|
||||
doDestroyBasicInfo(&pInfo->binfo, numOfOutput);
|
||||
}
|
||||
|
||||
static void destroySFillOperatorInfo(void* param, int32_t numOfOutput) {
|
||||
SFillOperatorInfo* pInfo = (SFillOperatorInfo*) param;
|
||||
|
@ -5463,7 +5622,7 @@ SOperatorInfo* createMultiTableAggOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SO
|
|||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
|
||||
pOperator->exec = doSTableAggregate;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
pOperator->cleanup = destroyAggOperatorInfo;
|
||||
appendUpstream(pOperator, upstream);
|
||||
|
||||
return pOperator;
|
||||
|
@ -5589,7 +5748,29 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
|
|||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
}
|
||||
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
|
||||
pInfo->colIndex = -1;
|
||||
pInfo->binfo.pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset);
|
||||
pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
||||
initResultRowInfo(&pInfo->binfo.resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
||||
|
||||
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
||||
pOperator->name = "StateWindowOperator";
|
||||
pOperator->operatorType = OP_StateWindow;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->exec = doStateWindowAgg;
|
||||
pOperator->cleanup = destroyStateWindowOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
|
||||
}
|
||||
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
||||
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
|
||||
|
||||
|
@ -5609,7 +5790,7 @@ SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperato
|
|||
pOperator->info = pInfo;
|
||||
pOperator->pRuntimeEnv = pRuntimeEnv;
|
||||
pOperator->exec = doSessionWindowAgg;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
pOperator->cleanup = destroySWindowOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
return pOperator;
|
||||
|
@ -6901,6 +7082,7 @@ SQInfo* createQInfoImpl(SQueryTableMsg* pQueryMsg, SGroupbyExpr* pGroupbyExpr, S
|
|||
pQueryAttr->simpleAgg = pQueryMsg->simpleAgg;
|
||||
pQueryAttr->pointInterpQuery = pQueryMsg->pointInterpQuery;
|
||||
pQueryAttr->needReverseScan = pQueryMsg->needReverseScan;
|
||||
pQueryAttr->stateWindow = pQueryMsg->stateWindow;
|
||||
pQueryAttr->vgId = vgId;
|
||||
|
||||
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SSingleColumnFilterInfo));
|
||||
|
|
|
@ -592,6 +592,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
|
|||
op = OP_SessionWindow;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->pExpr2 != NULL) {
|
||||
op = OP_Arithmetic;
|
||||
taosArrayPush(plan, &op);
|
||||
}
|
||||
} else if (pQueryAttr->stateWindow) {
|
||||
op = OP_StateWindow;
|
||||
taosArrayPush(plan, &op);
|
||||
|
||||
if (pQueryAttr->pExpr2 != NULL) {
|
||||
op = OP_Arithmetic;
|
||||
taosArrayPush(plan, &op);
|
||||
|
|
|
@ -726,9 +726,9 @@ void tSetColumnType(TAOS_FIELD *pField, SStrToken *type) {
|
|||
* extract the select info out of sql string
|
||||
*/
|
||||
SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelationInfo *pFrom, tSqlExpr *pWhere,
|
||||
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval, SSessionWindowVal *pSession,
|
||||
SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit, SLimitVal *psLimit,
|
||||
tSqlExpr *pHaving) {
|
||||
SArray *pGroupby, SArray *pSortOrder, SIntervalVal *pInterval,
|
||||
SSessionWindowVal *pSession, SWindowStateVal *pWindowStateVal, SStrToken *pSliding, SArray *pFill, SLimitVal *pLimit,
|
||||
SLimitVal *psLimit, tSqlExpr *pHaving) {
|
||||
assert(pSelNodeList != NULL);
|
||||
|
||||
SSqlNode *pSqlNode = calloc(1, sizeof(SSqlNode));
|
||||
|
@ -779,6 +779,12 @@ SSqlNode *tSetQuerySqlNode(SStrToken *pSelectToken, SArray *pSelNodeList, SRelat
|
|||
TPARSER_SET_NONE_TOKEN(pSqlNode->sessionVal.col);
|
||||
}
|
||||
|
||||
if (pWindowStateVal != NULL) {
|
||||
pSqlNode->windowstateVal = *pWindowStateVal;
|
||||
} else {
|
||||
TPARSER_SET_NONE_TOKEN(pSqlNode->windowstateVal.col);
|
||||
}
|
||||
|
||||
return pSqlNode;
|
||||
}
|
||||
|
||||
|
|
2302
src/query/src/sql.c
2302
src/query/src/sql.c
File diff suppressed because it is too large
Load Diff
|
@ -141,6 +141,7 @@ static SKeyword keywordTable[] = {
|
|||
{"VARIABLE", TK_VARIABLE},
|
||||
{"INTERVAL", TK_INTERVAL},
|
||||
{"SESSION", TK_SESSION},
|
||||
{"STATE_WINDOW", TK_STATE_WINDOW},
|
||||
{"FILL", TK_FILL},
|
||||
{"SLIDING", TK_SLIDING},
|
||||
{"ORDER", TK_ORDER},
|
||||
|
|
|
@ -6,9 +6,9 @@ events {
|
|||
}
|
||||
|
||||
http {
|
||||
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;/blah/?.lua;;';
|
||||
lua_package_path '$prefix/lua/?.lua;$prefix/rest/?.lua;$prefix/rest/?/init.lua;;';
|
||||
lua_package_cpath "$prefix/so/?.so;;";
|
||||
lua_code_cache off;
|
||||
lua_code_cache on;
|
||||
server {
|
||||
listen 7000;
|
||||
server_name restapi;
|
||||
|
|
|
@ -0,0 +1,10 @@
|
|||
local config = {
|
||||
host = "127.0.0.1",
|
||||
port = 6030,
|
||||
database = "",
|
||||
user = "root",
|
||||
password = "taosdata",
|
||||
max_packet_size = 1024 * 1024 ,
|
||||
connection_pool_size = 64
|
||||
}
|
||||
return config
|
|
@ -0,0 +1,72 @@
|
|||
local _M = {}
|
||||
local driver = require "luaconnector51"
|
||||
local water_mark = 0
|
||||
local occupied = 0
|
||||
local connection_pool = {}
|
||||
|
||||
function _M.new(o,config)
|
||||
o = o or {}
|
||||
o.connection_pool = connection_pool
|
||||
o.water_mark = water_mark
|
||||
o.occupied = occupied
|
||||
if #connection_pool == 0 then
|
||||
|
||||
for i = 1, config.connection_pool_size do
|
||||
local res = driver.connect(config)
|
||||
if res.code ~= 0 then
|
||||
ngx.log(ngx.ERR, "connect--- failed:"..res.error)
|
||||
return nil
|
||||
else
|
||||
local object = {obj = res.conn, state = 0}
|
||||
table.insert(o.connection_pool,i, object)
|
||||
ngx.log(ngx.INFO, "add connection, now pool size:"..#(o.connection_pool))
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
return setmetatable(o, { __index = _M })
|
||||
end
|
||||
|
||||
function _M:get_connection()
|
||||
|
||||
local connection_obj
|
||||
|
||||
for i = 1, #connection_pool do
|
||||
connection_obj = connection_pool[i]
|
||||
if connection_obj.state == 0 then
|
||||
connection_obj.state = 1
|
||||
occupied = occupied +1
|
||||
if occupied > water_mark then
|
||||
water_mark = occupied
|
||||
end
|
||||
return connection_obj["obj"]
|
||||
end
|
||||
end
|
||||
|
||||
ngx.log(ngx.ERR,"ALERT! NO FREE CONNECTION.")
|
||||
|
||||
return nil
|
||||
end
|
||||
|
||||
function _M:get_water_mark()
|
||||
|
||||
return water_mark
|
||||
end
|
||||
|
||||
function _M:release_connection(conn)
|
||||
|
||||
local connection_obj
|
||||
|
||||
for i = 1, #connection_pool do
|
||||
connection_obj = connection_pool[i]
|
||||
|
||||
if connection_obj["obj"] == conn then
|
||||
connection_obj["state"] = 0
|
||||
occupied = occupied -1
|
||||
return
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
return _M
|
|
@ -1,26 +1,11 @@
|
|||
local driver = require "luaconnector51"
|
||||
local cjson = require "cjson"
|
||||
local Pool = require "tdpool"
|
||||
local config = require "config"
|
||||
ngx.say("start time:"..os.time())
|
||||
|
||||
|
||||
local config = {
|
||||
host = "127.0.0.1",
|
||||
port = 6030,
|
||||
database = "",
|
||||
user = "root",
|
||||
password = "taosdata",
|
||||
max_packet_size = 1024 * 1024
|
||||
}
|
||||
|
||||
local conn
|
||||
local res = driver.connect(config)
|
||||
if res.code ~=0 then
|
||||
ngx.say("connect--- failed: "..res.error)
|
||||
return
|
||||
else
|
||||
conn = res.conn
|
||||
ngx.say("connect--- pass.")
|
||||
end
|
||||
local pool = Pool.new(Pool,config)
|
||||
local conn = pool:get_connection()
|
||||
|
||||
local res = driver.query(conn,"drop database if exists nginx")
|
||||
if res.code ~=0 then
|
||||
|
@ -51,7 +36,7 @@ else
|
|||
ngx.say("create table--- pass.")
|
||||
end
|
||||
|
||||
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001',0,'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
|
||||
res = driver.query(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 0, 'robotspace'), ('2019-09-01 00:00:00.002',1,'Hilink'),('2019-09-01 00:00:00.003',2,'Harmony')")
|
||||
if res.code ~=0 then
|
||||
ngx.say("insert records failed: "..res.error)
|
||||
return
|
||||
|
@ -77,7 +62,29 @@ else
|
|||
end
|
||||
|
||||
end
|
||||
driver.close(conn)
|
||||
ngx.say("end time:"..os.time())
|
||||
--ngx.log(ngx.ERR,"in test file.")
|
||||
|
||||
local flag = false
|
||||
function query_callback(res)
|
||||
if res.code ~=0 then
|
||||
ngx.say("async_query_callback--- failed:"..res.error)
|
||||
else
|
||||
if(res.affected == 3) then
|
||||
ngx.say("async_query_callback, insert records--- pass")
|
||||
else
|
||||
ngx.say("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
|
||||
end
|
||||
end
|
||||
flag = true
|
||||
end
|
||||
|
||||
driver.query_a(conn,"insert into m1 values ('2019-09-01 00:00:00.001', 3, 'robotspace'),('2019-09-01 00:00:00.006', 4, 'Hilink'),('2019-09-01 00:00:00.007', 6, 'Harmony')", query_callback)
|
||||
|
||||
while not flag do
|
||||
-- ngx.say("i am here once...")
|
||||
ngx.sleep(0.001) -- time unit is second
|
||||
end
|
||||
|
||||
ngx.say("pool water_mark:"..pool:get_water_mark())
|
||||
|
||||
pool:release_connection(conn)
|
||||
ngx.say("end time:"..os.time())
|
||||
|
|
Binary file not shown.
|
@ -13,6 +13,11 @@ struct cb_param{
|
|||
void * stream;
|
||||
};
|
||||
|
||||
struct async_query_callback_param{
|
||||
lua_State* state;
|
||||
int callback;
|
||||
};
|
||||
|
||||
static int l_connect(lua_State *L){
|
||||
TAOS * taos=NULL;
|
||||
const char* host;
|
||||
|
@ -23,7 +28,7 @@ static int l_connect(lua_State *L){
|
|||
|
||||
luaL_checktype(L, 1, LUA_TTABLE);
|
||||
|
||||
lua_getfield(L,-1,"host");
|
||||
lua_getfield(L, 1,"host");
|
||||
if (lua_isstring(L,-1)){
|
||||
host = lua_tostring(L, -1);
|
||||
// printf("host = %s\n", host);
|
||||
|
@ -178,6 +183,58 @@ static int l_query(lua_State *L){
|
|||
return 1;
|
||||
}
|
||||
|
||||
void async_query_callback(void *param, TAOS_RES *result, int code){
|
||||
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
|
||||
|
||||
//printf("\nin c,numfields:%d\n", numFields);
|
||||
//printf("\nin c, code:%d\n", code);
|
||||
|
||||
lua_State *L = p->state;
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
if( code < 0){
|
||||
printf("failed, reason:%s\n", taos_errstr(result));
|
||||
lua_pushinteger(L, -1);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
}else{
|
||||
//printf("success to async query.\n");
|
||||
const int affectRows = taos_affected_rows(result);
|
||||
//printf(" affect rows:%d\r\n", affectRows);
|
||||
lua_pushinteger(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushinteger(L, affectRows);
|
||||
lua_setfield(L, table_index, "affected");
|
||||
}
|
||||
|
||||
lua_call(L, 1, 0);
|
||||
}
|
||||
|
||||
static int l_async_query(lua_State *L){
|
||||
int r = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
TAOS * taos = (TAOS*)lua_topointer(L,1);
|
||||
const char * sqlstr = lua_tostring(L,2);
|
||||
// int stime = luaL_checknumber(L,3);
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
|
||||
p->state = L;
|
||||
p->callback=r;
|
||||
// printf("r:%d, L:%d\n",r,L);
|
||||
taos_query_a(taos,sqlstr,async_query_callback,p);
|
||||
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, "ok");
|
||||
lua_setfield(L, table_index, "error");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
|
||||
struct cb_param* p = (struct cb_param*) param;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
@ -308,6 +365,7 @@ static int l_close(lua_State *L){
|
|||
static const struct luaL_Reg lib[] = {
|
||||
{"connect", l_connect},
|
||||
{"query", l_query},
|
||||
{"query_a",l_async_query},
|
||||
{"close", l_close},
|
||||
{"open_stream", l_open_stream},
|
||||
{"close_stream", l_close_stream},
|
||||
|
|
|
@ -13,6 +13,11 @@ struct cb_param{
|
|||
void * stream;
|
||||
};
|
||||
|
||||
struct async_query_callback_param{
|
||||
lua_State* state;
|
||||
int callback;
|
||||
};
|
||||
|
||||
static int l_connect(lua_State *L){
|
||||
TAOS * taos=NULL;
|
||||
const char* host;
|
||||
|
@ -56,6 +61,7 @@ static int l_connect(lua_State *L){
|
|||
lua_settop(L,0);
|
||||
|
||||
taos_init();
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
|
@ -177,6 +183,58 @@ static int l_query(lua_State *L){
|
|||
return 1;
|
||||
}
|
||||
|
||||
void async_query_callback(void *param, TAOS_RES *result, int code){
|
||||
struct async_query_callback_param* p = (struct async_query_callback_param*) param;
|
||||
|
||||
//printf("\nin c,numfields:%d\n", numFields);
|
||||
//printf("\nin c, code:%d\n", code);
|
||||
|
||||
lua_State *L = p->state;
|
||||
lua_rawgeti(L, LUA_REGISTRYINDEX, p->callback);
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
if( code < 0){
|
||||
printf("failed, reason:%s\n", taos_errstr(result));
|
||||
lua_pushinteger(L, -1);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L,"something is wrong");// taos_errstr(taos));
|
||||
lua_setfield(L, table_index, "error");
|
||||
}else{
|
||||
//printf("success to async query.\n");
|
||||
const int affectRows = taos_affected_rows(result);
|
||||
//printf(" affect rows:%d\r\n", affectRows);
|
||||
lua_pushinteger(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushinteger(L, affectRows);
|
||||
lua_setfield(L, table_index, "affected");
|
||||
}
|
||||
|
||||
lua_call(L, 1, 0);
|
||||
}
|
||||
|
||||
static int l_async_query(lua_State *L){
|
||||
int r = luaL_ref(L, LUA_REGISTRYINDEX);
|
||||
TAOS * taos = (TAOS*)lua_topointer(L,1);
|
||||
const char * sqlstr = lua_tostring(L,2);
|
||||
// int stime = luaL_checknumber(L,3);
|
||||
|
||||
lua_newtable(L);
|
||||
int table_index = lua_gettop(L);
|
||||
|
||||
struct async_query_callback_param *p = malloc(sizeof(struct async_query_callback_param));
|
||||
p->state = L;
|
||||
p->callback=r;
|
||||
// printf("r:%d, L:%d\n",r,L);
|
||||
taos_query_a(taos,sqlstr,async_query_callback,p);
|
||||
|
||||
lua_pushnumber(L, 0);
|
||||
lua_setfield(L, table_index, "code");
|
||||
lua_pushstring(L, "ok");
|
||||
lua_setfield(L, table_index, "error");
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void stream_cb(void *param, TAOS_RES *result, TAOS_ROW row){
|
||||
struct cb_param* p = (struct cb_param*) param;
|
||||
TAOS_FIELD *fields = taos_fetch_fields(result);
|
||||
|
@ -307,6 +365,7 @@ static int l_close(lua_State *L){
|
|||
static const struct luaL_Reg lib[] = {
|
||||
{"connect", l_connect},
|
||||
{"query", l_query},
|
||||
{"query_a",l_async_query},
|
||||
{"close", l_close},
|
||||
{"open_stream", l_open_stream},
|
||||
{"close_stream", l_close_stream},
|
||||
|
|
|
@ -110,7 +110,25 @@ else
|
|||
end
|
||||
end
|
||||
|
||||
function callback(t)
|
||||
function async_query_callback(res)
|
||||
if res.code ~=0 then
|
||||
print("async_query_callback--- failed:"..res.error)
|
||||
return
|
||||
else
|
||||
|
||||
if(res.affected == 3) then
|
||||
print("async_query_callback, insert records--- pass")
|
||||
else
|
||||
print("async_query_callback, insert records---failed: expect 3 affected records, actually affected "..res.affected)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
||||
driver.query_a(conn,"INSERT INTO therm1 VALUES ('2019-09-01 00:00:00.005', 100),('2019-09-01 00:00:00.006', 101),('2019-09-01 00:00:00.007', 102)", async_query_callback)
|
||||
|
||||
|
||||
function stream_callback(t)
|
||||
print("------------------------")
|
||||
print("continuous query result:")
|
||||
for key, value in pairs(t) do
|
||||
|
@ -119,7 +137,7 @@ function callback(t)
|
|||
end
|
||||
|
||||
local stream
|
||||
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0,callback)
|
||||
res = driver.open_stream(conn,"SELECT COUNT(*) as count, AVG(degree) as avg, MAX(degree) as max, MIN(degree) as min FROM thermometer interval(2s) sliding(2s);)",0, stream_callback)
|
||||
if res.code ~=0 then
|
||||
print("open stream--- failed:"..res.error)
|
||||
return
|
||||
|
@ -146,4 +164,5 @@ while loop_index < 30 do
|
|||
end
|
||||
|
||||
driver.close_stream(stream)
|
||||
|
||||
driver.close(conn)
|
||||
|
|
|
@ -29,8 +29,8 @@ pipeline {
|
|||
agent none
|
||||
environment{
|
||||
|
||||
WK = '/var/lib/jenkins/workspace/TDinternal'
|
||||
WKC= '/var/lib/jenkins/workspace/TDinternal/community'
|
||||
WK = '/data/lib/jenkins/workspace/TDinternal'
|
||||
WKC= '/data/lib/jenkins/workspace/TDinternal/community'
|
||||
}
|
||||
|
||||
stages {
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
import sys
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor(), logSql)
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def isLuaInstalled(self):
|
||||
if not which('lua'):
|
||||
tdLog.exit("Lua not found!")
|
||||
return False
|
||||
else:
|
||||
return True
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
# tdLog.info("Check if Lua installed")
|
||||
# if not self.isLuaInstalled():
|
||||
# sys.exit(1)
|
||||
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
|
||||
targetPath = buildPath + "/../tests/examples/lua"
|
||||
tdLog.info(targetPath)
|
||||
currentPath = os.getcwd()
|
||||
os.chdir(targetPath)
|
||||
os.system('./build.sh')
|
||||
os.system('lua test.lua')
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
|
||||
#tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -82,6 +82,8 @@ class TDTestCase:
|
|||
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
|
||||
tdSql.query('select * from tbx')
|
||||
tdSql.checkRows(self.rows)
|
||||
#TD-4447 import the same csv twice
|
||||
tdSql.execute("import into tbx file \'%s\'"%(self.csvfile))
|
||||
|
||||
def stop(self):
|
||||
self.destroyCSVFile()
|
||||
|
|
|
@ -35,3 +35,5 @@ python3.8 ./test.py $1 -s && sleep 1
|
|||
python3.8 ./test.py $1 -f client/client.py
|
||||
python3.8 ./test.py $1 -s && sleep 1
|
||||
|
||||
# connector
|
||||
python3.8 ./test.py $1 -f connector/lua.py
|
||||
|
|
Loading…
Reference in New Issue