From f3b38edb980c7fd6ab545865e4888ae5e8dc0569 Mon Sep 17 00:00:00 2001
From: 54liuyao <54liuyao@163.com>
Date: Thu, 26 Dec 2024 14:33:14 +0800
Subject: [PATCH 1/2] add stream event notify
---
include/common/tcommon.h | 1 +
source/libs/executor/inc/executorInt.h | 6 +-
source/libs/executor/inc/streamexecutorInt.h | 3 +-
.../executor/src/streameventwindowoperator.c | 56 ++++++++++++++++++-
source/libs/executor/src/streamexecutorInt.c | 15 ++++-
.../src/streamintervalsliceoperator.c | 3 +-
.../executor/src/streamtimesliceoperator.c | 3 +-
7 files changed, 79 insertions(+), 8 deletions(-)
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 0450766535..3f76239ce5 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -160,6 +160,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
+ STREAM_EVENT_OPEN_WINDOW,
} EStreamType;
#pragma pack(push, 1)
diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index 48afa78251..04e7884020 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -450,8 +450,10 @@ typedef struct STimeWindowAggSupp {
} STimeWindowAggSupp;
typedef struct SSteamOpBasicInfo {
- int32_t primaryPkIndex;
- bool updateOperatorInfo;
+ int32_t primaryPkIndex;
+ bool updateOperatorInfo;
+ SSDataBlock* pEventRes;
+ SArray* pEventInfo;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h
index 0a69080314..0c0ea0d6fc 100644
--- a/source/libs/executor/inc/streamexecutorInt.h
+++ b/source/libs/executor/inc/streamexecutorInt.h
@@ -57,7 +57,8 @@ typedef struct SSlicePoint {
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type);
bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo);
void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo);
-void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
+int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
+void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo);
int64_t getDeleteMarkFromOption(SStreamNodeOption* pOption);
void removeDeleteResults(SSHashObj* pUpdatedMap, SArray* pDelWins);
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index fa6008eba7..a9a47580dc 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -12,6 +12,8 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
+
+#include "cmdnodes.h"
#include "executorInt.h"
#include "filter.h"
#include "function.h"
@@ -53,6 +55,8 @@ void destroyStreamEventOperatorInfo(void* param) {
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
+
+ destroyStreamBasicInfo(&pInfo->basic);
destroyStreamAggSupporter(&pInfo->streamAggSup);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
@@ -121,7 +125,7 @@ void reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) {
}
int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd,
- int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) {
+ int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey, int32_t* pWinCode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t winCode = TSDB_CODE_SUCCESS;
@@ -143,6 +147,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin);
if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) {
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
+ (*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@@ -156,6 +161,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
if (endi < 0 || pTs[endi] >= rightWinKey.win.skey) {
setEventWindowInfo(pAggSup, &rightWinKey, pVal, pCurWin);
pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin);
+ (*pWinCode) = TSDB_CODE_SUCCESS;
goto _end;
}
}
@@ -163,6 +169,7 @@ int32_t setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t gro
SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId};
code = pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len);
QUERY_CHECK_CODE(code, lino, _error);
+ (*pWinCode) = TSDB_CODE_FAILED;
setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin);
pCurWin->pWinFlag->startFlag = start;
@@ -303,6 +310,14 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
+static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
+ void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
+ if (pRes != NULL) {
+ return TSDB_CODE_SUCCESS;
+ }
+ return terrno;
+}
+
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@@ -373,10 +388,16 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
bool allEqual = true;
SEventWindowInfo curWin = {0};
SSessionKey nextWinKey = {0};
+ int32_t winCode = TSDB_CODE_SUCCESS;
code = setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin,
- &nextWinKey);
+ &nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
+ code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo);
bool rebuild = false;
code = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData,
@@ -561,12 +582,42 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
+static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ blockDataCleanup(pBasicInfo->pEventRes);
+ int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
+ code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
+ QUERY_CHECK_CODE(code, lino, _end);
+ for (int32_t i = 0; i < size; i++) {
+ SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
+ uint64_t uid = 0;
+ code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ taosArrayClear(pBasicInfo->pEventInfo);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
+ }
+}
+
+
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
+ buildEventNotifyResult(&pInfo->basic);
+ if (pInfo->basic.pEventRes->info.rows > 0) {
+ printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ (*ppRes) = pInfo->basic.pEventRes;
+ return code;
+ }
+
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@@ -957,6 +1008,7 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
pInfo->pPkDeleted = tSimpleHashInit(64, hashFn);
QUERY_CHECK_NULL(pInfo->pPkDeleted, code, lino, _error, terrno);
pInfo->destHasPrimaryKey = pEventNode->window.destHasPrimaryKey;
+ initStreamBasicInfo(&pInfo->basic);
pInfo->pOperator = pOperator;
setOperatorInfo(pOperator, "StreamEventAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT, true, OP_NOT_OPENED,
diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c
index b94798934c..1e7fbfa446 100644
--- a/source/libs/executor/src/streamexecutorInt.c
+++ b/source/libs/executor/src/streamexecutorInt.c
@@ -14,6 +14,7 @@
*/
#include "executorInt.h"
+#include "tdatablock.h"
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
@@ -29,7 +30,19 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
-void initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
+int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
+ pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
+ if (pBasicInfo->pEventInfo == NULL) {
+ return terrno;
+ }
+ return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
+}
+
+void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
+ blockDataDestroy(pBasicInfo->pEventRes);
+ pBasicInfo->pEventRes = NULL;
+ taosArrayDestroy(pBasicInfo->pEventInfo);
+ pBasicInfo->pEventInfo = NULL;
}
diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c
index d038e4d82c..45707e670e 100644
--- a/source/libs/executor/src/streamintervalsliceoperator.c
+++ b/source/libs/executor/src/streamintervalsliceoperator.c
@@ -651,7 +651,8 @@ int32_t createStreamIntervalSliceOperatorInfo(SOperatorInfo* downstream, SPhysiN
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamIntervalSliceReleaseState, streamIntervalSliceReloadState);
- initStreamBasicInfo(&pInfo->basic);
+ code = initStreamBasicInfo(&pInfo->basic);
+ QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initIntervalSliceDownStream(downstream, &pInfo->streamAggSup, pPhyNode->type, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, &pInfo->interval, pInfo->hasInterpoFunc);
diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c
index 44004a4c6b..9ec6063486 100644
--- a/source/libs/executor/src/streamtimesliceoperator.c
+++ b/source/libs/executor/src/streamtimesliceoperator.c
@@ -2201,7 +2201,8 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
setOperatorStreamStateFn(pOperator, streamTimeSliceReleaseState, streamTimeSliceReloadState);
- initStreamBasicInfo(&pInfo->basic);
+ code = initStreamBasicInfo(&pInfo->basic);
+ QUERY_CHECK_CODE(code, lino, _error);
if (downstream) {
code = initTimeSliceDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex,
&pInfo->twAggSup, &pInfo->basic, pInfo->pFillSup);
From eb5d46349071aaef64a7ce90a7bb65cc1799b09f Mon Sep 17 00:00:00 2001
From: Jinqing Kuang
Date: Thu, 16 Jan 2025 08:43:47 +0800
Subject: [PATCH 2/2] feat(stream)[TS-5469]. add support for window event
notifications in stream processing
- Introduce new syntax to specify notification type and destination address
- Collect relevant event information during window computations
- Implement websocket-based notification delivery to the specified address
---
cmake/addr2line_CMakeLists.txt.in | 2 +-
cmake/curl_CMakeLists.txt.in | 2 +-
cmake/ssl_CMakeLists.txt.in | 6 +-
contrib/CMakeLists.txt | 16 +-
include/common/tcommon.h | 5 +-
include/common/tdatablock.h | 2 +
include/common/tmsg.h | 6 +
include/libs/executor/executor.h | 3 +
include/libs/nodes/cmdnodes.h | 49 +-
include/libs/stream/tstream.h | 22 +-
include/util/tdef.h | 1 +
include/util/tlog.h | 3 +
source/common/CMakeLists.txt | 21 +-
source/common/src/msg/tmsg.c | 35 ++
source/common/src/tdatablock.c | 27 +
source/dnode/mnode/impl/src/mndStream.c | 79 +++
source/dnode/vnode/CMakeLists.txt | 1 +
source/dnode/vnode/src/inc/tq.h | 5 +
source/dnode/vnode/src/inc/vnodeInt.h | 5 +
source/dnode/vnode/src/tq/tqSink.c | 16 +-
source/dnode/vnode/src/tq/tqStreamNotify.c | 445 +++++++++++++++
source/dnode/vnode/src/tqCommon/tqCommon.c | 10 +-
source/dnode/vnode/src/vnd/vnodeOpen.c | 10 +
source/libs/executor/inc/executorInt.h | 16 +-
source/libs/executor/inc/querytask.h | 4 +
source/libs/executor/inc/streamexecutorInt.h | 10 +
source/libs/executor/src/executor.c | 22 +
source/libs/executor/src/querytask.c | 2 +
.../executor/src/streameventwindowoperator.c | 84 +--
source/libs/executor/src/streamexecutorInt.c | 519 +++++++++++++++++-
.../src/streamintervalsliceoperator.c | 1 +
.../executor/src/streamtimesliceoperator.c | 1 +
source/libs/nodes/src/nodesCodeFuncs.c | 52 ++
source/libs/nodes/src/nodesUtilFuncs.c | 9 +
source/libs/parser/inc/parAst.h | 6 +-
source/libs/parser/inc/sql.y | 22 +-
source/libs/parser/src/parAstCreater.c | 115 +++-
source/libs/parser/src/parTokenizer.c | 3 +
source/libs/parser/src/parTranslater.c | 43 ++
source/libs/stream/src/streamDispatch.c | 2 +-
source/libs/stream/src/streamMeta.c | 2 +
source/libs/stream/src/streamTask.c | 87 ++-
source/util/src/tlog.c | 29 +
43 files changed, 1706 insertions(+), 94 deletions(-)
create mode 100644 source/dnode/vnode/src/tq/tqStreamNotify.c
diff --git a/cmake/addr2line_CMakeLists.txt.in b/cmake/addr2line_CMakeLists.txt.in
index 93fb9bb96c..7cfcb46718 100644
--- a/cmake/addr2line_CMakeLists.txt.in
+++ b/cmake/addr2line_CMakeLists.txt.in
@@ -2,7 +2,7 @@
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
- GIT_TAG master
+ GIT_TAG main
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""
diff --git a/cmake/curl_CMakeLists.txt.in b/cmake/curl_CMakeLists.txt.in
index 6494177faf..2a14018810 100644
--- a/cmake/curl_CMakeLists.txt.in
+++ b/cmake/curl_CMakeLists.txt.in
@@ -12,7 +12,7 @@ ExternalProject_Add(curl2
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
UPDATE_COMMAND ""
- CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
+ CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-websockets --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
BUILD_COMMAND make -j
INSTALL_COMMAND make install
TEST_COMMAND ""
diff --git a/cmake/ssl_CMakeLists.txt.in b/cmake/ssl_CMakeLists.txt.in
index 1098593943..81e1cb15e9 100644
--- a/cmake/ssl_CMakeLists.txt.in
+++ b/cmake/ssl_CMakeLists.txt.in
@@ -6,9 +6,9 @@ ExternalProject_Add(openssl
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/openssl"
BUILD_IN_SOURCE TRUE
- #BUILD_ALWAYS 1
- #UPDATE_COMMAND ""
- CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
+ BUILD_ALWAYS 1
+ UPDATE_COMMAND ""
+ CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
BUILD_COMMAND make -j
INSTALL_COMMAND make install_sw -j
TEST_COMMAND ""
diff --git a/contrib/CMakeLists.txt b/contrib/CMakeLists.txt
index 2304ad54aa..767df03d22 100644
--- a/contrib/CMakeLists.txt
+++ b/contrib/CMakeLists.txt
@@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
- cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
endif(${BUILD_WITH_COS})
configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
@@ -146,11 +145,16 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
+# libcurl
+if(NOT ${TD_WINDOWS})
+ file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/)
+ cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
+ cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
+endif(NOT ${TD_WINDOWS})
+
# s3
if(${BUILD_WITH_S3})
- cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
- cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_S3)
@@ -160,7 +164,6 @@ elseif(${BUILD_WITH_COS})
# cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
- # cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_COS)
endif()
@@ -199,6 +202,11 @@ endif()
# lemon
cat("${TD_SUPPORT_DIR}/lemon_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
+# Force specify CC=cc on MacOS. Because the default CC setting in the generated Makefile has issues finding standard library headers
+IF(${TD_DARWIN})
+ SET(CONTRIB_CONFIG_ENV "CC=cc")
+ENDIF()
+
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .
diff --git a/include/common/tcommon.h b/include/common/tcommon.h
index 3f76239ce5..c30f2ab4ec 100644
--- a/include/common/tcommon.h
+++ b/include/common/tcommon.h
@@ -160,7 +160,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
- STREAM_EVENT_OPEN_WINDOW,
+ STREAM_NOTIFY_EVENT,
} EStreamType;
#pragma pack(push, 1)
@@ -409,6 +409,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2
+// stream notify event block column
+#define NOTIFY_EVENT_STR_COLUMN_INDEX 0
+
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);
diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h
index 1103b89ccb..96478047ca 100644
--- a/include/common/tdatablock.h
+++ b/include/common/tdatablock.h
@@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
+int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
+ char** dstTableName);
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index aebe09b563..82eaa2359e 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -269,6 +269,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
+ QUERY_NODE_STREAM_NOTIFY_OPTIONS,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
@@ -2956,6 +2957,11 @@ typedef struct {
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
+ // 3.3.6.0
+ SArray* pNotifyAddrUrls;
+ int32_t notifyEventTypes;
+ int32_t notifyErrorHandle;
+ int8_t notifyHistory;
} SCMCreateStreamReq;
typedef struct {
diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h
index 883c5f7b99..9a7c3912b0 100644
--- a/include/libs/executor/executor.h
+++ b/include/libs/executor/executor.h
@@ -98,6 +98,9 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
+int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
+ const char* stbFullName, bool newSubTableRule);
+
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo
diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h
index 12d77bd0c2..26482a87d4 100644
--- a/include/libs/nodes/cmdnodes.h
+++ b/include/libs/nodes/cmdnodes.h
@@ -566,19 +566,44 @@ typedef struct SStreamOptions {
int64_t setFlag;
} SStreamOptions;
+typedef enum EStreamNotifyOptionSetFlag {
+ SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0),
+ SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1),
+} EStreamNotifyOptionSetFlag;
+
+typedef enum EStreamNotifyEventType {
+ SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0),
+ SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1),
+} EStreamNotifyEventType;
+
+typedef enum EStreamNotifyErrorHandleType {
+ SNOTIFY_ERROR_HANDLE_PAUSE,
+ SNOTIFY_ERROR_HANDLE_DROP,
+} EStreamNotifyErrorHandleType;
+
+typedef struct SStreamNotifyOptions {
+ ENodeType type;
+ SNodeList* pAddrUrls;
+ EStreamNotifyEventType eventTypes;
+ EStreamNotifyErrorHandleType errorHandle;
+ bool notifyHistory;
+ EStreamNotifyOptionSetFlag setFlag;
+} SStreamNotifyOptions;
+
typedef struct SCreateStreamStmt {
- ENodeType type;
- char streamName[TSDB_TABLE_NAME_LEN];
- char targetDbName[TSDB_DB_NAME_LEN];
- char targetTabName[TSDB_TABLE_NAME_LEN];
- bool ignoreExists;
- SStreamOptions* pOptions;
- SNode* pQuery;
- SNode* pPrevQuery;
- SNodeList* pTags;
- SNode* pSubtable;
- SNodeList* pCols;
- SCMCreateStreamReq* pReq;
+ ENodeType type;
+ char streamName[TSDB_TABLE_NAME_LEN];
+ char targetDbName[TSDB_DB_NAME_LEN];
+ char targetTabName[TSDB_TABLE_NAME_LEN];
+ bool ignoreExists;
+ SStreamOptions* pOptions;
+ SNode* pQuery;
+ SNode* pPrevQuery;
+ SNodeList* pTags;
+ SNode* pSubtable;
+ SNodeList* pCols;
+ SStreamNotifyOptions* pNotifyOptions;
+ SCMCreateStreamReq* pReq;
} SCreateStreamStmt;
typedef struct SDropStreamStmt {
diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h
index a4d89dcdcc..9cd6dd13ca 100644
--- a/include/libs/stream/tstream.h
+++ b/include/libs/stream/tstream.h
@@ -65,10 +65,14 @@ typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
-#define SSTREAM_TASK_VER 4
-#define SSTREAM_TASK_INCOMPATIBLE_VER 1
-#define SSTREAM_TASK_NEED_CONVERT_VER 2
-#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
+#define SSTREAM_TASK_VER 5
+#define SSTREAM_TASK_INCOMPATIBLE_VER 1
+#define SSTREAM_TASK_NEED_CONVERT_VER 2
+#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId
+#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
+#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close
+
+#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
@@ -427,6 +431,15 @@ typedef struct STaskCheckInfo {
TdThreadMutex checkInfoLock;
} STaskCheckInfo;
+typedef struct SNotifyInfo {
+ SArray* pNotifyAddrUrls;
+ int32_t notifyEventTypes;
+ int32_t notifyErrorHandle;
+ char* streamName;
+ char* stbFullName;
+ SSchemaWrapper* pSchemaWrapper;
+} SNotifyInfo;
+
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
@@ -449,6 +462,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
+ SNotifyInfo notifyInfo;
// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;
diff --git a/include/util/tdef.h b/include/util/tdef.h
index 0fa00bf1d2..f08697b0d4 100644
--- a/include/util/tdef.h
+++ b/include/util/tdef.h
@@ -245,6 +245,7 @@ typedef enum ELogicConditionType {
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
+#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0'
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024
diff --git a/include/util/tlog.h b/include/util/tlog.h
index f573d61e73..60ddc29288 100644
--- a/include/util/tlog.h
+++ b/include/util/tlog.h
@@ -79,6 +79,9 @@ void taosResetLog();
void taosDumpData(uint8_t *msg, int32_t len);
void taosSetNoNewFile();
+// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance.
+char *u64toaFastLut(uint64_t val, char *buf);
+
void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...)
#ifdef __GNUC__
__attribute__((format(printf, 4, 5)))
diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt
index e050eaa16d..8dccdaa016 100644
--- a/source/common/CMakeLists.txt
+++ b/source/common/CMakeLists.txt
@@ -54,6 +54,23 @@ target_link_libraries(
INTERFACE api
)
+if(NOT ${TD_WINDOWS})
+ target_include_directories(
+ common
+ PUBLIC "$ENV{HOME}/.cos-local.2/include"
+ )
+
+ find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
+ find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
+ find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
+ target_link_libraries(
+ common
+ PUBLIC ${CURL_LIBRARY}
+ PUBLIC ${SSL_LIBRARY}
+ PUBLIC ${CRYPTO_LIBRARY}
+ )
+endif()
+
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
@@ -65,9 +82,6 @@ if(${BUILD_S3})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
- find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
- find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
- find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
target_link_libraries(
common
@@ -87,7 +101,6 @@ if(${BUILD_S3})
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
- find_library(CURL_LIBRARY curl)
target_link_libraries(
common
diff --git a/source/common/src/msg/tmsg.c b/source/common/src/msg/tmsg.c
index a3989012f6..7a51669d46 100644
--- a/source/common/src/msg/tmsg.c
+++ b/source/common/src/msg/tmsg.c
@@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId));
+
+ int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls);
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize));
+ for (int32_t i = 0; i < addrSize; ++i) {
+ const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
+ TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url)));
+ }
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes));
+ TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle));
+ TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory));
tEndEncode(&encoder);
_exit:
@@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId));
}
+ if (!tDecodeIsEnd(&decoder)) {
+ int32_t addrSize = 0;
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize));
+ pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
+ if (pReq->pNotifyAddrUrls == NULL) {
+ TAOS_CHECK_EXIT(terrno);
+ }
+ for (int32_t i = 0; i < addrSize; ++i) {
+ char *url = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url));
+ url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
+ if (url == NULL) {
+ TAOS_CHECK_EXIT(terrno);
+ }
+ if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
+ taosMemoryFree(url);
+ TAOS_CHECK_EXIT(terrno);
+ }
+ }
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes));
+ TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle));
+ TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory));
+ }
+
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
@@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
+ taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {
diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c
index bd18c9ceb9..c3e0fff578 100644
--- a/source/common/src/tdatablock.c
+++ b/source/common/src/tdatablock.c
@@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}
+int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
+ char** dstTableName) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ if (parTbName[0]) {
+ if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
+ stbFullName) {
+ *dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
+ TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
+
+ tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
+ code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
+ TSDB_CHECK_CODE(code, lino, _end);
+ } else {
+ *dstTableName = taosStrdup(parTbName);
+ TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
+ }
+ } else {
+ code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+
+_end:
+ return code;
+}
+
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);
diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c
index 50018e867f..c1cf41103b 100644
--- a/source/dnode/mnode/impl/src/mndStream.c
+++ b/source/dnode/mnode/impl/src/mndStream.c
@@ -753,6 +753,77 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
return TSDB_CODE_SUCCESS;
}
+static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
+
+static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
+ SStreamTask *pTask) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
+ TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
+ pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
+ pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
+ pTask->notifyInfo.streamName = taosStrdup(createReq->name);
+ TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
+ pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
+ TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
+ pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
+ TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ int32_t level = 0;
+ int32_t nTasks = 0;
+ SArray *pLevel = NULL;
+
+ TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
+ goto _end;
+ }
+
+ level = taosArrayGetSize(pStream->tasks);
+ for (int32_t i = 0; i < level; ++i) {
+ pLevel = taosArrayGetP(pStream->tasks, i);
+ nTasks = taosArrayGetSize(pLevel);
+ for (int32_t j = 0; j < nTasks; ++j) {
+ code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+ }
+
+ if (pStream->conf.fillHistory && createReq->notifyHistory) {
+ level = taosArrayGetSize(pStream->pHTasksList);
+ for (int32_t i = 0; i < level; ++i) {
+ pLevel = taosArrayGetP(pStream->pHTasksList, i);
+ nTasks = taosArrayGetSize(pLevel);
+ for (int32_t j = 0; j < nTasks; ++j) {
+ code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
+ TSDB_CHECK_CODE(code, lino, _end);
+ }
+ }
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
+ }
+ return code;
+}
+
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@@ -850,6 +921,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
+ // add notify info into all stream tasks
+ code = addStreamNotifyInfo(&createReq, &streamObj);
+ if (code != TSDB_CODE_SUCCESS) {
+ mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
+ mndTransDrop(pTrans);
+ goto _OVER;
+ }
+
// add stream to trans
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt
index 8f63cc8779..b90e1844ae 100644
--- a/source/dnode/vnode/CMakeLists.txt
+++ b/source/dnode/vnode/CMakeLists.txt
@@ -75,6 +75,7 @@ set(
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
+ "src/tq/tqStreamNotify.c"
)
aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES)
diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h
index 12a803d1d8..e0bf51b333 100644
--- a/source/dnode/vnode/src/inc/tq.h
+++ b/source/dnode/vnode/src/inc/tq.h
@@ -159,6 +159,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
+// tq send notifications
+int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
+void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
+int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode);
+
#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \
diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h
index 940116317c..02c3b3ebe0 100644
--- a/source/dnode/vnode/src/inc/vnodeInt.h
+++ b/source/dnode/vnode/src/inc/vnodeInt.h
@@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
+typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap;
+
#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"
@@ -496,6 +498,9 @@ struct SVnode {
int64_t blockSeq;
SQHandle* pQuery;
SVMonitorObj monitor;
+
+ // Notification Handles
+ SStreamNotifyHandleMap* pNotifyHandleMap;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c
index 7ba77cf813..98ea92125c 100644
--- a/source/dnode/vnode/src/tq/tqSink.c
+++ b/source/dnode/vnode/src/tq/tqSink.c
@@ -16,8 +16,6 @@
#include "tcommon.h"
#include "tq.h"
-#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
-
typedef struct STableSinkInfo {
uint64_t uid;
tstr name;
@@ -983,7 +981,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
- } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
+ } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
}
if (code != TSDB_CODE_SUCCESS) {
@@ -1150,6 +1148,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return;
}
+ code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
+ // continue processing even if notification fails
+ }
+
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
@@ -1173,6 +1177,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
} else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
+ } else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
+ continue;
} else {
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
}
@@ -1317,6 +1323,10 @@ void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVno
continue;
}
+ if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
hasSubmit = true;
pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;
diff --git a/source/dnode/vnode/src/tq/tqStreamNotify.c b/source/dnode/vnode/src/tq/tqStreamNotify.c
new file mode 100644
index 0000000000..46ee95d3b9
--- /dev/null
+++ b/source/dnode/vnode/src/tq/tqStreamNotify.c
@@ -0,0 +1,445 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#include "cmdnodes.h"
+#include "tq.h"
+
+#ifndef WINDOWS
+#include "curl/curl.h"
+#endif
+
+#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50ms
+
+typedef struct SStreamNotifyHandle {
+ TdThreadMutex mutex;
+#ifndef WINDOWS
+ CURL* curl;
+#endif
+ char* url;
+} SStreamNotifyHandle;
+
+struct SStreamNotifyHandleMap {
+ TdThreadMutex gMutex;
+ SHashObj* handleMap;
+};
+
+static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) {
+#ifndef WINDOWS
+ if (pHandle == NULL || pHandle->curl == NULL) {
+ return;
+ }
+ // status code 1000 means normal closure
+ size_t len = 0;
+ uint16_t status = htons(1000);
+ CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE);
+ if (res != CURLE_OK) {
+ tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res);
+ }
+ // TODO: add wait mechanism for peer connection close response
+ curl_easy_cleanup(pHandle->curl);
+#endif
+}
+
+static void destroyStreamNotifyHandle(void* ptr) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SStreamNotifyHandle** ppHandle = ptr;
+
+ if (ppHandle == NULL || *ppHandle == NULL) {
+ return;
+ }
+ code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
+ stopStreamNotifyConn(*ppHandle);
+ taosMemoryFreeClear((*ppHandle)->url);
+ taosMemoryFreeClear(*ppHandle);
+}
+
+static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
+ if (ppHandle == NULL || *ppHandle == NULL) {
+ return;
+ }
+ (void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
+ *ppHandle = NULL;
+}
+
+static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url,
+ SStreamNotifyHandle** ppHandle) {
+#ifndef WINDOWS
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ bool gLocked = false;
+ SStreamNotifyHandle** ppFindHandle = NULL;
+ SStreamNotifyHandle* pNewHandle = NULL;
+ CURL* newCurl = NULL;
+ CURLcode res = CURLE_OK;
+
+ TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *ppHandle = NULL;
+
+ code = taosThreadMutexLock(&pMap->gMutex);
+ TSDB_CHECK_CODE(code, lino, _end);
+ gLocked = true;
+
+ ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
+ if (ppFindHandle == NULL) {
+ pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
+ TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
+ code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
+ TSDB_CHECK_CODE(code, lino, _end);
+ code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
+ TSDB_CHECK_CODE(code, lino, _end);
+ *ppHandle = pNewHandle;
+ pNewHandle = NULL;
+ } else {
+ *ppHandle = *ppFindHandle;
+ }
+
+ code = taosThreadMutexLock(&(*ppHandle)->mutex);
+ TSDB_CHECK_CODE(code, lino, _end);
+
+ (void)taosThreadMutexUnlock(&pMap->gMutex);
+ gLocked = false;
+
+ if ((*ppHandle)->curl == NULL) {
+ newCurl = curl_easy_init();
+ TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_setopt(newCurl, CURLOPT_URL, url);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ res = curl_easy_perform(newCurl);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ (*ppHandle)->curl = newCurl;
+ newCurl = NULL;
+ }
+
+ if ((*ppHandle)->url == NULL) {
+ (*ppHandle)->url = taosStrdup(url);
+ TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
+ if (*ppHandle) {
+ releaseStreamNotifyHandle(ppHandle);
+ }
+ *ppHandle = NULL;
+ }
+ if (newCurl) {
+ curl_easy_cleanup(newCurl);
+ }
+ if (pNewHandle) {
+ destroyStreamNotifyHandle(&pNewHandle);
+ }
+ if (gLocked) {
+ (void)taosThreadMutexUnlock(&pMap->gMutex);
+ }
+ return code;
+#else
+ tqError("stream notify events is not supported on windows");
+ return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
+#endif
+}
+
+int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SStreamNotifyHandleMap* pMap = NULL;
+
+ TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *ppMap = NULL;
+ pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
+ TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
+ code = taosThreadMutexInit(&pMap->gMutex, NULL);
+ TSDB_CHECK_CODE(code, lino, _end);
+ pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
+ taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
+ *ppMap = pMap;
+ pMap = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (pMap != NULL) {
+ tqDestroyNotifyHandleMap(&pMap);
+ }
+ return code;
+}
+
+void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ if (*ppMap == NULL) {
+ return;
+ }
+ taosHashCleanup((*ppMap)->handleMap);
+ code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
+ taosMemoryFreeClear((*ppMap));
+}
+
+#define JSON_CHECK_ADD_ITEM(obj, str, item) \
+ TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+
+static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ cJSON* obj = NULL;
+ cJSON* streams = NULL;
+ cJSON* stream = NULL;
+ char msgId[37];
+
+ TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pHeader = NULL;
+
+ code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
+ TSDB_CHECK_CODE(code, lino, _end);
+
+ stream = cJSON_CreateObject();
+ TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
+ JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
+
+ streams = cJSON_CreateArray();
+ TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+ stream = NULL;
+
+ obj = cJSON_CreateObject();
+ TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
+ JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
+ JSON_CHECK_ADD_ITEM(obj, "streams", streams);
+ streams = NULL;
+
+ *pHeader = cJSON_PrintUnformatted(obj);
+ TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (stream != NULL) {
+ cJSON_Delete(stream);
+ }
+ if (streams != NULL) {
+ cJSON_Delete(streams);
+ }
+ if (obj != NULL) {
+ cJSON_Delete(obj);
+ }
+ return code;
+}
+
+static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg,
+ int32_t* nNotifyEvents) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ int32_t numOfBlocks = 0;
+ int32_t msgHeaderLen = 0;
+ int32_t msgTailLen = 0;
+ int32_t msgLen = 0;
+ char* msgHeader = NULL;
+ const char* msgTail = "]}]}";
+ char* msg = NULL;
+
+ TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pMsg = NULL;
+ numOfBlocks = taosArrayGetSize(pBlocks);
+ *nNotifyEvents = 0;
+
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
+ if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
+ SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
+ char* val = colDataGetVarData(pEventStrCol, j);
+ msgLen += varDataLen(val) + 1;
+ }
+ *nNotifyEvents += pDataBlock->info.rows;
+ }
+
+ if (msgLen == 0) {
+ // skip since no notification events found
+ goto _end;
+ }
+
+ code = getStreamNotifyEventHeader(streamName, &msgHeader);
+ TSDB_CHECK_CODE(code, lino, _end);
+ msgHeaderLen = strlen(msgHeader);
+ msgTailLen = strlen(msgTail);
+ msgLen += msgHeaderLen;
+
+ msg = taosMemoryMalloc(msgLen);
+ TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
+ char* p = msg;
+ TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
+ p += msgHeaderLen - msgTailLen;
+
+ for (int32_t i = 0; i < numOfBlocks; ++i) {
+ SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
+ if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
+ continue;
+ }
+
+ SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
+ char* val = colDataGetVarData(pEventStrCol, j);
+ TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
+ p += varDataLen(val);
+ *(p++) = ',';
+ }
+ }
+
+ p -= 1;
+ TAOS_STRNCPY(p, msgTail, msgTailLen);
+ *(p + msgTailLen) = '\0';
+
+ *pMsg = msg;
+ msg = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (msgHeader != NULL) {
+ cJSON_free(msgHeader);
+ }
+ if (msg != NULL) {
+ taosMemoryFreeClear(msg);
+ }
+ return code;
+}
+
+static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
+#ifndef WINDOWS
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ CURLcode res = CURLE_OK;
+ uint64_t sentLen = 0;
+ uint64_t totalLen = 0;
+ size_t nbytes = 0;
+
+ TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ totalLen = strlen(msg);
+ while (sentLen < totalLen) {
+ res = curl_ws_send(pHandle->curl, msg + sentLen, totalLen - sentLen, &nbytes, 0, CURLWS_TEXT);
+ TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
+ sentLen += nbytes;
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
+ stopStreamNotifyConn(pHandle);
+ }
+ return code;
+#else
+ tqError("stream notify events is not supported on windows");
+ return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
+#endif
+}
+
+int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ char* msg = NULL;
+ int32_t nNotifyAddr = 0;
+ int32_t nNotifyEvents = 0;
+ SStreamNotifyHandle* pHandle = NULL;
+
+ TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
+ if (nNotifyAddr == 0) {
+ goto _end;
+ }
+
+ code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents);
+ TSDB_CHECK_CODE(code, lino, _end);
+ if (msg == NULL) {
+ goto _end;
+ }
+
+ tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName,
+ nNotifyEvents, (uint64_t)strlen(msg));
+
+ for (int32_t i = 0; i < nNotifyAddr; ++i) {
+ if (streamTaskShouldStop(pTask)) {
+ break;
+ }
+ const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
+ code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("failed to get stream notify handle of %s", url);
+ if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
+ // retry for event message sending in PAUSE error handling mode
+ taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
+ --i;
+ continue;
+ } else {
+ // simply ignore the failure in DROP error handling mode
+ code = TSDB_CODE_SUCCESS;
+ continue;
+ }
+ }
+ code = sendSingleStreamNotify(pHandle, msg);
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
+ if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
+ // retry for event message sending in PAUSE error handling mode
+ taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
+ --i;
+ } else {
+ // simply ignore the failure in DROP error handling mode
+ code = TSDB_CODE_SUCCESS;
+ }
+ } else {
+ tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents,
+ url);
+ }
+ releaseStreamNotifyHandle(&pHandle);
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (msg) {
+ taosMemoryFreeClear(msg);
+ }
+ return code;
+}
diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c
index 06b7b33cd8..1880156f61 100644
--- a/source/dnode/vnode/src/tqCommon/tqCommon.c
+++ b/source/dnode/vnode/src/tqCommon/tqCommon.c
@@ -86,6 +86,14 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
if (code) {
return code;
}
+
+ code =
+ qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
+ pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName, IS_NEW_SUBTB_RULE(pTask));
+ if (code) {
+ tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
+ return code;
+ }
}
streamSetupScheduleTrigger(pTask);
@@ -1357,4 +1365,4 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return 0;
-}
\ No newline at end of file
+}
diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c
index 6de5298728..280ee527f7 100644
--- a/source/dnode/vnode/src/vnd/vnodeOpen.c
+++ b/source/dnode/vnode/src/vnd/vnodeOpen.c
@@ -15,6 +15,7 @@
#include "sync.h"
#include "tcs.h"
+#include "tq.h"
#include "tsdb.h"
#include "vnd.h"
@@ -483,6 +484,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
ret = taosRealPath(tdir, NULL, sizeof(tdir));
TAOS_UNUSED(ret);
+ // init handle map for stream event notification
+ ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap);
+ if (ret != TSDB_CODE_SUCCESS) {
+ vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode));
+ terrno = ret;
+ goto _err;
+ }
+
// open query
vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
if (vnodeQueryOpen(pVnode)) {
@@ -555,6 +564,7 @@ void vnodeClose(SVnode *pVnode) {
vnodeAWait(&pVnode->commitTask);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
+ tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);
diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h
index 04e7884020..84eba69acb 100644
--- a/source/libs/executor/inc/executorInt.h
+++ b/source/libs/executor/inc/executorInt.h
@@ -449,11 +449,17 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
+typedef struct SStreamNotifyEventSupp {
+ SArray* pWindowEvents; // Array of SStreamNotifyEvent, storing window events and trigger values.
+ SHashObj* pTableNameHashMap; // Hash map from groupid to the dest child table name.
+ SHashObj* pResultHashMap; // Hash map from groupid+skey to the window agg result.
+ SSDataBlock* pEventBlock; // The datablock contains all window events and results.
+} SStreamNotifyEventSupp;
+
typedef struct SSteamOpBasicInfo {
- int32_t primaryPkIndex;
- bool updateOperatorInfo;
- SSDataBlock* pEventRes;
- SArray* pEventInfo;
+ int32_t primaryPkIndex;
+ bool updateOperatorInfo;
+ SStreamNotifyEventSupp windowEventSup;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
@@ -769,6 +775,8 @@ typedef struct SStreamEventAggOperatorInfo {
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
+ SNodeList* pStartCondCols;
+ SNodeList* pEndCondCols;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {
diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h
index f726e4300f..86ee6f4124 100644
--- a/source/libs/executor/inc/querytask.h
+++ b/source/libs/executor/inc/querytask.h
@@ -71,6 +71,10 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
+ int32_t eventTypes; // event types to notify
+ SSchemaWrapper* notifyResultSchema; // agg result to notify
+ char* stbFullName; // used to generate dest child table name
+ bool newSubTableRule; // used to generate dest child table name
} SStreamTaskInfo;
struct SExecTaskInfo {
diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h
index 0c0ea0d6fc..7b3c828351 100644
--- a/source/libs/executor/inc/streamexecutorInt.h
+++ b/source/libs/executor/inc/streamexecutorInt.h
@@ -19,7 +19,10 @@
extern "C" {
#endif
+#include "cJSON.h"
+#include "cmdnodes.h"
#include "executorInt.h"
+#include "querytask.h"
#include "tutil.h"
#define FILL_POS_INVALID 0
@@ -107,6 +110,13 @@ int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
+int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
+ const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
+ SStreamNotifyEventSupp* sup);
+int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
+ SStreamNotifyEventSupp* sup);
+int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup);
+
#ifdef __cplusplus
}
#endif
diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c
index 1386b0b82f..39bef9c95f 100644
--- a/source/libs/executor/src/executor.c
+++ b/source/libs/executor/src/executor.c
@@ -250,6 +250,28 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
+int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
+ const char* stbFullName, bool newSubTableRule) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ SStreamTaskInfo *pStreamInfo = NULL;
+
+ if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) {
+ goto _end;
+ }
+
+ pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
+ pStreamInfo->eventTypes = eventTypes;
+ pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper);
+ if (pStreamInfo->notifyResultSchema == NULL) {
+ code = terrno;
+ }
+ pStreamInfo->stbFullName = taosStrdup(stbFullName);
+ pStreamInfo->newSubTableRule = newSubTableRule;
+
+_end:
+ return code;
+}
+
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;
diff --git a/source/libs/executor/src/querytask.c b/source/libs/executor/src/querytask.c
index c6a1900b41..20c80df4fa 100644
--- a/source/libs/executor/src/querytask.c
+++ b/source/libs/executor/src/querytask.c
@@ -262,6 +262,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSchemaWrapper(pStreamInfo->schema);
tOffsetDestroy(&pStreamInfo->currentOffset);
+ tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
+ taosMemoryFree(pStreamInfo->stbFullName);
}
static void freeBlock(void* pParam) {
diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c
index a9a47580dc..5f4d6b30fa 100644
--- a/source/libs/executor/src/streameventwindowoperator.c
+++ b/source/libs/executor/src/streameventwindowoperator.c
@@ -93,6 +93,16 @@ void destroyStreamEventOperatorInfo(void* param) {
pInfo->pEndCondInfo = NULL;
}
+ if (pInfo->pStartCondCols != NULL) {
+ nodesDestroyList(pInfo->pStartCondCols);
+ pInfo->pStartCondCols = NULL;
+ }
+
+ if (pInfo->pEndCondCols != NULL) {
+ nodesDestroyList(pInfo->pEndCondCols);
+ pInfo->pEndCondCols = NULL;
+ }
+
taosMemoryFreeClear(param);
}
@@ -310,14 +320,6 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
-static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
- void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
- if (pRes != NULL) {
- return TSDB_CODE_SUCCESS;
- }
- return terrno;
-}
-
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@@ -393,8 +395,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
- if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
- code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) &&
+ *(bool*)colDataGetNumData(pColStart, i) && winCode != TSDB_CODE_SUCCESS) {
+ code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, pSDataBlock,
+ pInfo->pStartCondCols, i, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
@@ -464,6 +468,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
+
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
+ code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_CLOSE, &curWin.winInfo.sessionWin, pSDataBlock,
+ pInfo->pEndCondCols, i + winRows - 1, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
}
_end:
@@ -582,42 +592,13 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
-static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
- int32_t code = TSDB_CODE_SUCCESS;
- int32_t lino = 0;
-
- blockDataCleanup(pBasicInfo->pEventRes);
- int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
- code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
- QUERY_CHECK_CODE(code, lino, _end);
- for (int32_t i = 0; i < size; i++) {
- SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
- uint64_t uid = 0;
- code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
- QUERY_CHECK_CODE(code, lino, _end);
- }
- taosArrayClear(pBasicInfo->pEventInfo);
-
-_end:
- if (code != TSDB_CODE_SUCCESS) {
- qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
- }
-}
-
-
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
- buildEventNotifyResult(&pInfo->basic);
- if (pInfo->basic.pEventRes->info.rows > 0) {
- printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
- (*ppRes) = pInfo->basic.pEventRes;
- return code;
- }
-
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@@ -628,10 +609,27 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
+ code = addAggResultNotifyEvent(pBInfo->pRes, pTaskInfo->streamInfo.notifyResultSchema, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
(*ppRes) = pBInfo->pRes;
return code;
}
+
+ code = buildNotifyEventBlock(pTaskInfo, &pInfo->basic.windowEventSup);
+ QUERY_CHECK_CODE(code, lino, _end);
+ if (pInfo->basic.windowEventSup.pEventBlock->info.rows > 0) {
+ printDataBlock(pInfo->basic.windowEventSup.pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
+ (*ppRes) = pInfo->basic.windowEventSup.pEventBlock;
+ return code;
+ }
+
+_end:
(*ppRes) = NULL;
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
+ }
return code;
}
@@ -1041,6 +1039,12 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
+ code =
+ nodesCollectColumnsFromNode((SNode*)pEventNode->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pStartCondCols);
+ QUERY_CHECK_CODE(code, lino, _error);
+ code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols);
+ QUERY_CHECK_CODE(code, lino, _error);
+
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;
diff --git a/source/libs/executor/src/streamexecutorInt.c b/source/libs/executor/src/streamexecutorInt.c
index 1e7fbfa446..9cafdfff0c 100644
--- a/source/libs/executor/src/streamexecutorInt.c
+++ b/source/libs/executor/src/streamexecutorInt.c
@@ -13,9 +13,20 @@
* along with this program. If not, see .
*/
+#include "streamexecutorInt.h"
+
#include "executorInt.h"
#include "tdatablock.h"
+#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16
+
+typedef struct SStreamNotifyEvent {
+ uint64_t gid;
+ TSKEY skey;
+ char* content;
+ bool isEnd;
+} SStreamNotifyEvent;
+
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
pBasicInfo->updateOperatorInfo = true;
@@ -30,19 +41,509 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
+static void destroyStreamWindowEvent(void* ptr) {
+ SStreamNotifyEvent* pEvent = ptr;
+ if (pEvent == NULL || pEvent->content == NULL) return;
+ cJSON_free(pEvent->content);
+}
+
+static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) {
+ if (sup == NULL) return;
+ taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent);
+ taosHashCleanup(sup->pTableNameHashMap);
+ taosHashCleanup(sup->pResultHashMap);
+ blockDataDestroy(sup->pEventBlock);
+ *sup = (SStreamNotifyEventSupp){0};
+}
+
+static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp *sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SSDataBlock* pBlock = NULL;
+ SColumnInfoData infoData = {0};
+
+ if (sup == NULL) {
+ goto _end;
+ }
+
+ code = createDataBlock(&pBlock);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ pBlock->info.type = STREAM_NOTIFY_EVENT;
+ pBlock->info.watermark = INT64_MIN;
+
+ infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
+ infoData.info.bytes = tDataTypes[infoData.info.type].bytes;
+ code = blockDataAppendColInfo(pBlock, &infoData);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ sup->pWindowEvents = taosArrayInit(0, sizeof(SStreamNotifyEvent));
+ QUERY_CHECK_NULL(sup->pWindowEvents, code, lino, _end, terrno);
+ sup->pTableNameHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
+ QUERY_CHECK_NULL(sup->pTableNameHashMap, code, lino, _end, terrno);
+ sup->pResultHashMap = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ QUERY_CHECK_NULL(sup->pResultHashMap, code, lino, _end, terrno);
+ taosHashSetFreeFp(sup->pResultHashMap, destroyStreamWindowEvent);
+ sup->pEventBlock = pBlock;
+ pBlock = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ if (sup) {
+ destroyStreamNotifyEventSupp(sup);
+ }
+ }
+ if (pBlock != NULL) {
+ blockDataDestroy(pBlock);
+ }
+ return code;
+}
+
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
- pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
- if (pBasicInfo->pEventInfo == NULL) {
- return terrno;
- }
- return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
+ return initStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
}
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
- blockDataDestroy(pBasicInfo->pEventRes);
- pBasicInfo->pEventRes = NULL;
- taosArrayDestroy(pBasicInfo->pEventInfo);
- pBasicInfo->pEventInfo = NULL;
+ destroyStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
+}
+
+static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *buf) {
+ uint64_t hash = 0;
+ uint64_t ar[2];
+
+ ar[0] = pSessionKey->groupId;
+ ar[1] = pSessionKey->win.skey;
+ hash = MurmurHash3_64((char*)ar, sizeof(ar));
+ buf = u64toaFastLut(hash, buf);
+}
+
+#define JSON_CHECK_ADD_ITEM(obj, str, item) \
+ QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
+
+static int32_t jsonAddColumnField(const char* colName, const SColumnInfoData* pColData, int32_t ri, cJSON* obj) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ char* temp = NULL;
+
+ QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pColData, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ if (colDataIsNull_s(pColData, ri)) {
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull());
+ goto _end;
+ }
+
+ switch (pColData->info.type) {
+ case TSDB_DATA_TYPE_BOOL: {
+ bool val = *(bool*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_TINYINT: {
+ int8_t val = *(int8_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_SMALLINT: {
+ int16_t val = *(int16_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_INT: {
+ int32_t val = *(int32_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_BIGINT:
+ case TSDB_DATA_TYPE_TIMESTAMP: {
+ int64_t val = *(int64_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_FLOAT: {
+ float val = *(float*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_DOUBLE: {
+ double val = *(double*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_VARCHAR:
+ case TSDB_DATA_TYPE_NCHAR: {
+ // cJSON requires null-terminated strings, but this data is not null-terminated,
+ // so we need to manually copy the string and add null termination.
+ const char* src = varDataVal(colDataGetVarData(pColData, ri));
+ int32_t len = varDataLen(colDataGetVarData(pColData, ri));
+ temp = cJSON_malloc(len + 1);
+ QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ memcpy(temp, src, len);
+ temp[len] = '\0';
+
+ cJSON* item = cJSON_CreateStringReference(temp);
+ JSON_CHECK_ADD_ITEM(obj, colName, item);
+
+ // let the cjson object to free memory later
+ item->type &= ~cJSON_IsReference;
+ temp = NULL;
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UTINYINT: {
+ uint8_t val = *(uint8_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_USMALLINT: {
+ uint16_t val = *(uint16_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UINT: {
+ uint32_t val = *(uint32_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ case TSDB_DATA_TYPE_UBIGINT: {
+ uint64_t val = *(uint64_t*)colDataGetNumData(pColData, ri);
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
+ break;
+ }
+
+ default: {
+ JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference(""));
+ break;
+ }
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (temp) {
+ cJSON_free(temp);
+ }
+ return code;
+}
+
+int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
+ const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
+ SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SNode* node = NULL;
+ cJSON* event = NULL;
+ cJSON* fields = NULL;
+ cJSON* cond = NULL;
+ SStreamNotifyEvent item = {0};
+ char windowId[32];
+
+ QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ qDebug("add stream notify event from event window, type: %s, start: %" PRId64 ", end: %" PRId64,
+ (eventType == SNOTIFY_EVENT_WINDOW_OPEN) ? "WINDOW_OPEN" : "WINDOW_CLOSE", pSessionKey->win.skey,
+ pSessionKey->win.ekey);
+
+ event = cJSON_CreateObject();
+ QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+ // add basic info
+ streamNotifyGetEventWindowId(pSessionKey, windowId);
+ if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
+ JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_OPEN"));
+ } else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
+ JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_CLOSE"));
+ }
+ JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs()));
+ JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateStringReference(windowId));
+ JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference("Event"));
+ JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey));
+ if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
+ JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey));
+ }
+
+ // create fields object to store matched column values
+ fields = cJSON_CreateObject();
+ QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ FOREACH(node, pCondCols) {
+ SColumnNode* pColDef = (SColumnNode*)node;
+ SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId);
+ code = jsonAddColumnField(pColDef->colName, pColData, ri, fields);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+
+ // add trigger condition
+ cond = cJSON_CreateObject();
+ QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
+ JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
+ fields = NULL;
+ JSON_CHECK_ADD_ITEM(event, "triggerConditions", cond);
+ cond = NULL;
+
+ // convert json object to string value
+ item.gid = pSessionKey->groupId;
+ item.skey = pSessionKey->win.skey;
+ item.isEnd = (eventType == SNOTIFY_EVENT_WINDOW_CLOSE);
+ item.content = cJSON_PrintUnformatted(event);
+ QUERY_CHECK_NULL(taosArrayPush(sup->pWindowEvents, &item), code, lino, _end, terrno);
+ item.content = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ destroyStreamWindowEvent(&item);
+ if (cond != NULL) {
+ cJSON_Delete(cond);
+ }
+ if (fields != NULL) {
+ cJSON_Delete(fields);
+ }
+ if (event != NULL) {
+ cJSON_Delete(event);
+ }
+ return code;
+}
+
+int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
+ SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SNode * node = NULL;
+ cJSON* event = NULL;
+ cJSON* result = NULL;
+ SStreamNotifyEvent item = {0};
+ SColumnInfoData* pWstartCol = NULL;
+
+ QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ qDebug("add %" PRId64 " stream notify results from window agg", pResultBlock->info.rows);
+
+ pWstartCol = taosArrayGet(pResultBlock->pDataBlock, 0);
+ for (int32_t i = 0; i< pResultBlock->info.rows; ++i) {
+ event = cJSON_CreateObject();
+ QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+
+ // convert the result row into json
+ result = cJSON_CreateObject();
+ QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
+ for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) {
+ SSchema *pCol = pSchemaWrapper->pSchema + j;
+ SColumnInfoData *pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1);
+ code = jsonAddColumnField(pCol->name, pColData, i, result);
+ QUERY_CHECK_CODE(code, lino, _end);
+ }
+ JSON_CHECK_ADD_ITEM(event, "result", result);
+ result = NULL;
+
+ item.gid = pResultBlock->info.id.groupId;
+ item.skey = *(uint64_t*)colDataGetNumData(pWstartCol, i);
+ item.content = cJSON_PrintUnformatted(event);
+ code = taosHashPut(sup->pResultHashMap, &item.gid, sizeof(item.gid) + sizeof(item.skey), &item, sizeof(item));
+ TSDB_CHECK_CODE(code, lino, _end);
+ item.content = NULL;
+
+ cJSON_Delete(event);
+ event = NULL;
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ destroyStreamWindowEvent(&item);
+ if (result != NULL) {
+ cJSON_Delete(result);
+ }
+ if (event != NULL) {
+ cJSON_Delete(event);
+ }
+ return code;
+}
+
+static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ const SStorageAPI* pAPI = NULL;
+ void* tbname = NULL;
+ int32_t winCode = TSDB_CODE_SUCCESS;
+ char parTbName[TSDB_TABLE_NAME_LEN];
+ const SStreamTaskInfo* pStreamInfo = NULL;
+
+ QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+
+ *pTableName = NULL;
+
+ pAPI = &pTaskInfo->storageAPI;
+ code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode);
+ QUERY_CHECK_CODE(code, lino, _end);
+ if (winCode != TSDB_CODE_SUCCESS) {
+ parTbName[0] = '\0';
+ } else {
+ tstrncpy(parTbName, tbname, sizeof(parTbName));
+ }
+ pAPI->stateStore.streamStateFreeVal(tbname);
+
+ pStreamInfo = &pTaskInfo->streamInfo;
+ code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t streamNotifyFillTableName(const char* tableName, const SStreamNotifyEvent* pEvent,
+ const SStreamNotifyEvent* pResult, char** pVal) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ static const char* prefix = "{\"tableName\":\"";
+ uint64_t prefixLen = 0;
+ uint64_t nameLen = 0;
+ uint64_t eventLen = 0;
+ uint64_t resultLen = 0;
+ uint64_t valLen = 0;
+ char* val = NULL;
+ char* p = NULL;
+
+ QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pEvent, code, lino , _end, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(pVal, code, lino , _end, TSDB_CODE_INVALID_PARA);
+
+ *pVal = NULL;
+ prefixLen = strlen(prefix);
+ nameLen = strlen(tableName);
+ eventLen = strlen(pEvent->content);
+
+ if (pResult != NULL) {
+ resultLen = strlen(pResult->content);
+ valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + resultLen;
+ } else {
+ valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + 1;
+ }
+ val = taosMemoryMalloc(valLen);
+ QUERY_CHECK_NULL(val, code, lino, _end, terrno);
+ varDataSetLen(val, valLen - VARSTR_HEADER_SIZE);
+
+ p = varDataVal(val);
+ TAOS_STRNCPY(p, prefix, prefixLen);
+ p += prefixLen;
+ TAOS_STRNCPY(p, tableName, nameLen);
+ p += nameLen;
+ *(p++) = '\"';
+ TAOS_STRNCPY(p, pEvent->content, eventLen);
+ *p = ',';
+
+ if (pResult != NULL) {
+ p += eventLen - 1;
+ TAOS_STRNCPY(p, pResult->content, resultLen);
+ *p = ',';
+ }
+ *pVal = val;
+ val = NULL;
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (val != NULL) {
+ taosMemoryFreeClear(val);
+ }
+ return code;
+}
+
+int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+ SColumnInfoData* pEventStrCol = NULL;
+ int32_t nWindowEvents = 0;
+ int32_t nWindowResults = 0;
+ char* val = NULL;
+
+ if (pTaskInfo == NULL || sup == NULL) {
+ goto _end;
+ }
+
+ QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
+ blockDataCleanup(sup->pEventBlock);
+ nWindowEvents = taosArrayGetSize(sup->pWindowEvents);
+ nWindowResults = taosHashGetSize(sup->pResultHashMap);
+ qDebug("start to build stream notify event block, nWindowEvents: %d, nWindowResults: %d", nWindowEvents,
+ nWindowResults);
+ if (nWindowEvents == 0) {
+ goto _end;
+ }
+
+ code = blockDataEnsureCapacity(sup->pEventBlock, nWindowEvents);
+ QUERY_CHECK_CODE(code, lino, _end);
+
+ pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
+ QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno);
+
+ for (int32_t i = 0; i < nWindowEvents; ++i) {
+ SStreamNotifyEvent* pResult = NULL;
+ SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowEvents, i);
+ char* tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
+ if (tableName == NULL) {
+ code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+ code = taosHashPut(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1);
+ taosMemoryFreeClear(tableName);
+ QUERY_CHECK_CODE(code, lino, _end);
+ tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
+ QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
+ }
+ if (pEvent->isEnd) {
+ pResult = taosHashGet(sup->pResultHashMap, &pEvent->gid, sizeof(pEvent->gid) + sizeof(pEvent->skey));
+ QUERY_CHECK_NULL(pResult, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
+ }
+ code = streamNotifyFillTableName(tableName, pEvent, pResult, &val);
+ QUERY_CHECK_CODE(code, lino, _end);
+ code = colDataSetVal(pEventStrCol, i, val, false);
+ QUERY_CHECK_CODE(code, lino, _end);
+ taosMemoryFreeClear(val);
+ sup->pEventBlock->info.rows++;
+ }
+
+ if (taosHashGetMemSize(sup->pTableNameHashMap) >= NOTIFY_EVENT_NAME_CACHE_LIMIT_MB * 1024 * 1024) {
+ taosHashClear(sup->pTableNameHashMap);
+ }
+
+_end:
+ if (code != TSDB_CODE_SUCCESS) {
+ qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ if (val != NULL) {
+ taosMemoryFreeClear(val);
+ }
+ if (sup != NULL) {
+ taosArrayClearEx(sup->pWindowEvents, destroyStreamWindowEvent);
+ taosHashClear(sup->pResultHashMap);
+ }
+ return code;
}
diff --git a/source/libs/executor/src/streamintervalsliceoperator.c b/source/libs/executor/src/streamintervalsliceoperator.c
index 45707e670e..44799f193b 100644
--- a/source/libs/executor/src/streamintervalsliceoperator.c
+++ b/source/libs/executor/src/streamintervalsliceoperator.c
@@ -55,6 +55,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) {
pInfo->pOperator = NULL;
}
+ destroyStreamBasicInfo(&pInfo->basic);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;
diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c
index 9ec6063486..4fe8efe397 100644
--- a/source/libs/executor/src/streamtimesliceoperator.c
+++ b/source/libs/executor/src/streamtimesliceoperator.c
@@ -150,6 +150,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
+ destroyStreamBasicInfo(&pInfo->basic);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyStreamAggSupporter(&pInfo->streamAggSup);
resetPrevAndNextWindow(pInfo->pFillSup);
diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c
index bea9b96215..bfe86aa2ac 100644
--- a/source/libs/nodes/src/nodesCodeFuncs.c
+++ b/source/libs/nodes/src/nodesCodeFuncs.c
@@ -99,6 +99,8 @@ const char* nodesNodeName(ENodeType type) {
return "CountWindow";
case QUERY_NODE_ANOMALY_WINDOW:
return "AnomalyWindow";
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return "StreamNotifyOptions";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
@@ -5812,6 +5814,45 @@ static int32_t jsonToStreamOptions(const SJson* pJson, void* pObj) {
return code;
}
+static const char* jkStreamNotifyOptionsAddrUrls = "AddrUrls";
+static const char* jkStreamNotifyOptionsEventType = "EventType";
+static const char* jkStreamNotifyOptionsErrorHandle = "ErrorHandle";
+static const char* jkStreamNotifyOptionsNotifyHistory = "NotifyHistory";
+
+static int32_t streamNotifyOptionsToJson(const void* pObj, SJson* pJson) {
+ const SStreamNotifyOptions* pNotifyOption = (const SStreamNotifyOptions*)pObj;
+ int32_t code = nodeListToJson(pJson, jkStreamNotifyOptionsAddrUrls, pNotifyOption->pAddrUrls);
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsEventType, pNotifyOption->eventTypes);
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsErrorHandle, pNotifyOption->errorHandle);
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonAddBoolToObject(pJson, jkStreamNotifyOptionsNotifyHistory, pNotifyOption->notifyHistory);
+ }
+
+ return code;
+}
+
+static int32_t jsonToStreamNotifyOptions(const SJson* pJson, void* pObj) {
+ SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pObj;
+ int32_t code = jsonToNodeList(pJson, jkStreamNotifyOptionsAddrUrls, &pNotifyOption->pAddrUrls);
+ int32_t val = 0;
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsEventType, &val);
+ pNotifyOption->eventTypes = val;
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsErrorHandle, &val);
+ pNotifyOption->errorHandle = val;
+ }
+ if (code == TSDB_CODE_SUCCESS) {
+ code = tjsonGetBoolValue(pJson, jkStreamNotifyOptionsNotifyHistory, &pNotifyOption->notifyHistory);
+ }
+ return code;
+}
+
static const char* jkWhenThenWhen = "When";
static const char* jkWhenThenThen = "Then";
@@ -7207,6 +7248,7 @@ static const char* jkCreateStreamStmtOptions = "Options";
static const char* jkCreateStreamStmtQuery = "Query";
static const char* jkCreateStreamStmtTags = "Tags";
static const char* jkCreateStreamStmtSubtable = "Subtable";
+static const char* jkCreateStreamStmtNotifyOptions = "NotifyOptions";
static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
const SCreateStreamStmt* pNode = (const SCreateStreamStmt*)pObj;
@@ -7233,6 +7275,9 @@ static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateStreamStmtSubtable, nodeToJson, pNode->pSubtable);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = tjsonAddObject(pJson, jkCreateStreamStmtNotifyOptions, nodeToJson, pNode->pNotifyOptions);
+ }
return code;
}
@@ -7262,6 +7307,9 @@ static int32_t jsonToCreateStreamStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateStreamStmtSubtable, &pNode->pSubtable);
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = jsonToNodeObject(pJson, jkCreateStreamStmtNotifyOptions, (SNode**)&pNode->pNotifyOptions);
+ }
return code;
}
@@ -8029,6 +8077,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return countWindowNodeToJson(pObj, pJson);
case QUERY_NODE_ANOMALY_WINDOW:
return anomalyWindowNodeToJson(pObj, pJson);
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return streamNotifyOptionsToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
@@ -8402,6 +8452,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToCountWindowNode(pJson, pObj);
case QUERY_NODE_ANOMALY_WINDOW:
return jsonToAnomalyWindowNode(pJson, pObj);
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ return jsonToStreamNotifyOptions(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:
diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c
index 3d4df385f7..ae5b302d2d 100644
--- a/source/libs/nodes/src/nodesUtilFuncs.c
+++ b/source/libs/nodes/src/nodesUtilFuncs.c
@@ -467,6 +467,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_WINDOW_OFFSET:
code = makeNode(type, sizeof(SWindowOffsetNode), &pNode);
break;
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
+ code = makeNode(type, sizeof(SStreamNotifyOptions), &pNode);
+ break;
case QUERY_NODE_SET_OPERATOR:
code = makeNode(type, sizeof(SSetOperator), &pNode);
break;
@@ -1267,6 +1270,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pAround->pTimepoint);
break;
}
+ case QUERY_NODE_STREAM_NOTIFY_OPTIONS: {
+ SStreamNotifyOptions* pNotifyOptions = (SStreamNotifyOptions*)pNode;
+ nodesDestroyList(pNotifyOptions->pAddrUrls);
+ break;
+ }
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
@@ -1479,6 +1487,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pQuery);
nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable);
+ nodesDestroyNode((SNode*)pStmt->pNotifyOptions);
tFreeSCMCreateStreamReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
break;
diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h
index dc9986ad04..387bccf358 100644
--- a/source/libs/parser/inc/parAst.h
+++ b/source/libs/parser/inc/parAst.h
@@ -296,8 +296,12 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
+SNode* createStreamNotifyOptions(SAstCreateContext *pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes);
+SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
+ SToken* pToken);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
- SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
+ SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
+ SNode* pNotifyOptions);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName);
diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y
index 7f383afe48..439af13d71 100644
--- a/source/libs/parser/inc/sql.y
+++ b/source/libs/parser/inc/sql.y
@@ -785,7 +785,7 @@ full_view_name(A) ::= db_name(B) NK_DOT view_name(C).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
- AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
+ AS query_or_subquery(D) notify_opt(I). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H, I); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); }
cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); }
@@ -832,6 +832,26 @@ subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP.
ignore_opt(A) ::= . { A = false; }
ignore_opt(A) ::= IGNORE UNTREATED. { A = true; }
+notify_opt(A) ::= . { A = NULL; }
+notify_opt(A) ::= notify_def(B). { A = B; }
+
+notify_def(A) ::= NOTIFY NK_LP url_def_list(B) NK_RP ON NK_LP event_def_list(C) NK_RP. { A = createStreamNotifyOptions(pCxt, B, C); }
+notify_def(A) ::= notify_def(B) ON_FAILURE DROP(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
+notify_def(A) ::= notify_def(B) ON_FAILURE PAUSE(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
+notify_def(A) ::= notify_def(B) NOTIFY_HISTORY NK_INTEGER(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_NOTIFY_HISTORY_SET, &C); }
+
+%type url_def_list { SNodeList* }
+%destructor url_def_list { nodesDestroyList($$); }
+url_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
+url_def_list(A) ::= url_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
+
+%type event_def_list { SNodeList* }
+%destructor event_def_list { nodesDestroyList($$); }
+event_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
+event_def_list(A) ::= event_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
+
+
+
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); }
diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c
index 5b90fd601e..c875cbad05 100644
--- a/source/libs/parser/src/parAstCreater.c
+++ b/source/libs/parser/src/parAstCreater.c
@@ -1526,8 +1526,8 @@ SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhe
pCaseWhen->pCase = pCase;
pCaseWhen->pWhenThenList = pWhenThenList;
pCaseWhen->pElse = pElse;
- pCaseWhen->tz = pCxt->pQueryCxt->timezone;
- pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
+ pCaseWhen->tz = pCxt->pQueryCxt->timezone;
+ pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
return (SNode*)pCaseWhen;
_err:
nodesDestroyNode(pCase);
@@ -3657,8 +3657,115 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions
return pOptions;
}
+static bool validateNotifyUrl(const char* url) {
+ const char* prefix[] = {"http://", "https://", "ws://", "wss://"};
+ const char* host = NULL;
+
+ if (!url || *url == '\0') return false;
+
+ for (int32_t i = 0; i < ARRAY_SIZE(prefix); ++i) {
+ if (strncasecmp(url, prefix[i], strlen(prefix[i])) == 0) {
+ host = url + strlen(prefix[i]);
+ break;
+ }
+ }
+
+ return (host != NULL) && (*host != '\0') && (*host != '/');
+}
+
+SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes) {
+ SNode* pNode = NULL;
+ EStreamNotifyEventType eventTypes = 0;
+ const char* eWindowOpenStr = "WINDOW_OPEN";
+ const char* eWindowCloseStr = "WINDOW_CLOSE";
+
+ CHECK_PARSER_STATUS(pCxt);
+
+ if (LIST_LENGTH(pAddrUrls) == 0) {
+ pCxt->errCode =
+ generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "notification address cannot be empty");
+ goto _err;
+ }
+
+ FOREACH(pNode, pAddrUrls) {
+ char *url = ((SValueNode*)pNode)->literal;
+ if (strlen(url) >= TSDB_STREAM_NOTIFY_URL_LEN) {
+ pCxt->errCode =
+ generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "notification address \"%s\" exceed maximum length %d", url, TSDB_STREAM_NOTIFY_URL_LEN);
+ goto _err;
+ }
+ if (!validateNotifyUrl(url)) {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "invalid notification address \"%s\"", url);
+ goto _err;
+ }
+ }
+
+ if (LIST_LENGTH(pEventTypes) == 0) {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "event types must be specified for notification");
+ goto _err;
+ }
+
+ FOREACH(pNode, pEventTypes) {
+ char *eventStr = ((SValueNode *)pNode)->literal;
+ if (strncasecmp(eventStr, eWindowOpenStr, strlen(eWindowOpenStr) + 1) == 0) {
+ BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_OPEN);
+ } else if (strncasecmp(eventStr, eWindowCloseStr, strlen(eWindowCloseStr) + 1) == 0) {
+ BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
+ } else {
+ pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "invalid event type '%s' for notification", eventStr);
+ goto _err;
+ }
+ }
+
+ SStreamNotifyOptions* pNotifyOptions = NULL;
+ pCxt->errCode = nodesMakeNode(QUERY_NODE_STREAM_NOTIFY_OPTIONS, (SNode**)&pNotifyOptions);
+ CHECK_MAKE_NODE(pNotifyOptions);
+ pNotifyOptions->pAddrUrls = pAddrUrls;
+ pNotifyOptions->eventTypes = eventTypes;
+ pNotifyOptions->errorHandle = SNOTIFY_ERROR_HANDLE_PAUSE;
+ pNotifyOptions->notifyHistory = false;
+ nodesDestroyList(pEventTypes);
+ return (SNode*)pNotifyOptions;
+_err:
+ nodesDestroyList(pAddrUrls);
+ nodesDestroyList(pEventTypes);
+ return NULL;
+}
+
+SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
+ SToken* pToken) {
+ CHECK_PARSER_STATUS(pCxt);
+
+ SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pNode;
+ if (BIT_FLAG_TEST_MASK(pNotifyOption->setFlag, setFlag)) {
+ pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
+ "stream notify options each item can only be set once");
+ goto _err;
+ }
+ switch (setFlag) {
+ case SNOTIFY_OPT_ERROR_HANDLE_SET:
+ pNotifyOption->errorHandle = (pToken->type == TK_DROP) ? SNOTIFY_ERROR_HANDLE_DROP : SNOTIFY_ERROR_HANDLE_PAUSE;
+ break;
+ case SNOTIFY_OPT_NOTIFY_HISTORY_SET:
+ pNotifyOption->notifyHistory = taosStr2Int8(pToken->z, NULL, 10);
+ break;
+ default:
+ break;
+ }
+ BIT_FLAG_SET_MASK(pNotifyOption->setFlag, setFlag);
+ return pNode;
+_err:
+ nodesDestroyNode(pNode);
+ return NULL;
+}
+
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
- SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
+ SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
+ SNode* pNotifyOptions) {
CHECK_PARSER_STATUS(pCxt);
CHECK_NAME(checkStreamName(pCxt, pStreamName));
SCreateStreamStmt* pStmt = NULL;
@@ -3674,6 +3781,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
pStmt->pTags = pTags;
pStmt->pSubtable = pSubtable;
pStmt->pCols = pCols;
+ pStmt->pNotifyOptions = (SStreamNotifyOptions*)pNotifyOptions;
return (SNode*)pStmt;
_err:
nodesDestroyNode(pRealTable);
@@ -3682,6 +3790,7 @@ _err:
nodesDestroyList(pTags);
nodesDestroyNode(pSubtable);
nodesDestroyList(pCols);
+ nodesDestroyNode(pNotifyOptions);
return NULL;
}
diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c
index ea2e9d712f..7ed438a7dc 100644
--- a/source/libs/parser/src/parTokenizer.c
+++ b/source/libs/parser/src/parTokenizer.c
@@ -355,6 +355,9 @@ static SKeyword keywordTable[] = {
{"FORCE_WINDOW_CLOSE", TK_FORCE_WINDOW_CLOSE},
{"DISK_INFO", TK_DISK_INFO},
{"AUTO", TK_AUTO},
+ {"NOTIFY", TK_NOTIFY},
+ {"ON_FAILURE", TK_ON_FAILURE},
+ {"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
};
// clang-format on
diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c
index 1d87b83e62..74dd1be614 100755
--- a/source/libs/parser/src/parTranslater.c
+++ b/source/libs/parser/src/parTranslater.c
@@ -12192,6 +12192,45 @@ static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt
return TSDB_CODE_SUCCESS;
}
+static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOptions* pNotifyOptions,
+ SCMCreateStreamReq* pReq) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ SNode* pNode = NULL;
+
+ if (pNotifyOptions == NULL || pNotifyOptions->pAddrUrls->length == 0) {
+ return code;
+ }
+
+ pReq->pNotifyAddrUrls = taosArrayInit(pNotifyOptions->pAddrUrls->length, POINTER_BYTES);
+ if (pReq->pNotifyAddrUrls != NULL) {
+ FOREACH(pNode, pNotifyOptions->pAddrUrls) {
+ char *url = taosStrndup(((SValueNode*)pNode)->literal, TSDB_STREAM_NOTIFY_URL_LEN);
+ if (url == NULL) {
+ code = terrno;
+ break;
+ }
+ if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
+ code = terrno;
+ taosMemoryFreeClear(url);
+ break;
+ }
+ }
+ } else {
+ code = terrno;
+ }
+
+ if (code == TSDB_CODE_SUCCESS) {
+ pReq->notifyEventTypes = pNotifyOptions->eventTypes;
+ pReq->notifyErrorHandle = pNotifyOptions->errorHandle;
+ pReq->notifyHistory = pNotifyOptions->notifyHistory;
+ } else {
+ taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
+ pReq->pNotifyAddrUrls = NULL;
+ }
+
+ return code;
+}
+
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
@@ -12238,6 +12277,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
}
}
+ if (TSDB_CODE_SUCCESS == code) {
+ code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq);
+ }
+
return code;
}
diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c
index 42d7f44b62..baf36d0453 100644
--- a/source/libs/stream/src/streamDispatch.c
+++ b/source/libs/stream/src/streamDispatch.c
@@ -735,7 +735,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId, sizeof(pDataBlock->info.parTbName));
- } else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
+ } else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER) {
code = buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName,
groupId, sizeof(pDataBlock->info.parTbName));
}
diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c
index 0de256d86d..dde7b197c4 100644
--- a/source/libs/stream/src/streamMeta.c
+++ b/source/libs/stream/src/streamMeta.c
@@ -198,6 +198,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
+ tDecoderClear(&decoder);
continue;
}
@@ -1031,6 +1032,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
+ tDecoderClear(&decoder);
continue;
}
tDecoderClear(&decoder);
diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c
index d27ed520c6..5ee8bd43f5 100644
--- a/source/libs/stream/src/streamTask.c
+++ b/source/libs/stream/src/streamTask.c
@@ -326,6 +326,11 @@ void tFreeStreamTask(void* pParam) {
streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
pTask->chkInfo.pActiveInfo = NULL;
+ taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
+ taosMemoryFreeClear(pTask->notifyInfo.streamName);
+ taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
+ tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
+
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@@ -1318,6 +1323,78 @@ void streamTaskFreeRefId(int64_t* pRefId) {
metaRefMgtRemove(pRefId);
}
+static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+
+ int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
+ for (int32_t i = 0; i < addrSize; ++i) {
+ const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
+ TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
+ }
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
+ TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
+ if (addrSize > 0) {
+ TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
+ TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
+ TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
+ }
+
+_exit:
+ if (code != TSDB_CODE_SUCCESS) {
+ stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
+
+static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
+ int32_t code = TSDB_CODE_SUCCESS;
+ int32_t lino = 0;
+
+ QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+ QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
+
+ int32_t addrSize = 0;
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
+ info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
+ QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
+ for (int32_t i = 0; i < addrSize; ++i) {
+ char *url = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
+ url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
+ QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
+ if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
+ taosMemoryFree(url);
+ TAOS_CHECK_EXIT(terrno);
+ }
+ }
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
+ TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
+ if (addrSize > 0) {
+ char* name = NULL;
+ TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
+ info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
+ QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
+ TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
+ info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
+ QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
+ info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
+ if (info->pSchemaWrapper == NULL) {
+ TAOS_CHECK_EXIT(terrno);
+ }
+ TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
+ }
+
+_exit:
+ if (code != TSDB_CODE_SUCCESS) {
+ stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
+ }
+ return code;
+}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t code = 0;
@@ -1388,6 +1465,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
+ if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
+ TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
+ }
+
tEndEncode(pEncoder);
_exit:
return code;
@@ -1486,8 +1567,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
+ if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
+ TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
+ }
+
tEndDecode(pDecoder);
_exit:
return code;
-}
\ No newline at end of file
+}
diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c
index 4f5ca8d789..03ef00a0c0 100644
--- a/source/util/src/tlog.c
+++ b/source/util/src/tlog.c
@@ -1490,3 +1490,32 @@ bool taosAssertRelease(bool condition) {
return true;
}
#endif
+
+char* u64toaFastLut(uint64_t val, char* buf) {
+ static const char* lut =
+ "0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455"
+ "5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899";
+
+ char temp[24];
+ char* p = temp;
+
+ while (val >= 100) {
+ strncpy(p, lut + (val % 100) * 2, 2);
+ val /= 100;
+ p += 2;
+ }
+
+ if (val >= 10) {
+ strncpy(p, lut + val * 2, 2);
+ p += 2;
+ } else if (val > 0 || p == temp) {
+ *(p++) = val + '0';
+ }
+
+ while (p != temp) {
+ *buf++ = *--p;
+ }
+
+ *buf = '\0';
+ return buf;
+}