feat(stream)[TS-5469]. add support for window event notifications in stream processing

- Introduce new syntax to specify notification type and destination address
- Collect relevant event information during window computations
- Implement websocket-based notification delivery to the specified address
This commit is contained in:
Jinqing Kuang 2025-01-16 08:43:47 +08:00
parent f3b38edb98
commit eb5d463490
43 changed files with 1706 additions and 94 deletions

View File

@ -2,7 +2,7 @@
# addr2line
ExternalProject_Add(addr2line
GIT_REPOSITORY https://github.com/davea42/libdwarf-addr2line.git
GIT_TAG master
GIT_TAG main
SOURCE_DIR "${TD_CONTRIB_DIR}/addr2line"
BINARY_DIR "${TD_CONTRIB_DIR}/addr2line"
CONFIGURE_COMMAND ""

View File

@ -12,7 +12,7 @@ ExternalProject_Add(curl2
BUILD_IN_SOURCE TRUE
BUILD_ALWAYS 1
UPDATE_COMMAND ""
CONFIGURE_COMMAND ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./configure --prefix=$ENV{HOME}/.cos-local.2 --with-ssl=$ENV{HOME}/.cos-local.2 --enable-websockets --enable-shared=no --disable-ldap --disable-ldaps --without-brotli --without-zstd --without-libidn2 --without-nghttp2 --without-libpsl #--enable-debug
BUILD_COMMAND make -j
INSTALL_COMMAND make install
TEST_COMMAND ""

View File

@ -6,9 +6,9 @@ ExternalProject_Add(openssl
DOWNLOAD_DIR "${TD_CONTRIB_DIR}/deps-download"
SOURCE_DIR "${TD_CONTRIB_DIR}/openssl"
BUILD_IN_SOURCE TRUE
#BUILD_ALWAYS 1
#UPDATE_COMMAND ""
CONFIGURE_COMMAND ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
BUILD_ALWAYS 1
UPDATE_COMMAND ""
CONFIGURE_COMMAND ${CONTRIB_CONFIG_ENV} ./Configure --prefix=$ENV{HOME}/.cos-local.2 no-shared
BUILD_COMMAND make -j
INSTALL_COMMAND make install_sw -j
TEST_COMMAND ""

View File

@ -17,7 +17,6 @@ elseif(${BUILD_WITH_COS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.1/)
cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE3})
endif(${BUILD_WITH_COS})
configure_file(${CONTRIB_TMP_FILE3} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
@ -146,11 +145,16 @@ if(${BUILD_WITH_SQLITE})
cat("${TD_SUPPORT_DIR}/sqlite_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(${BUILD_WITH_SQLITE})
# libcurl
if(NOT ${TD_WINDOWS})
file(MAKE_DIRECTORY $ENV{HOME}/.cos-local.2/)
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif(NOT ${TD_WINDOWS})
# s3
if(${BUILD_WITH_S3})
cat("${TD_SUPPORT_DIR}/ssl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/xml2_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/libs3_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/azure_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_S3)
@ -160,7 +164,6 @@ elseif(${BUILD_WITH_COS})
# cat("${TD_SUPPORT_DIR}/mxml_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/apr-util_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# cat("${TD_SUPPORT_DIR}/curl_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
cat("${TD_SUPPORT_DIR}/cos_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_COS)
endif()
@ -199,6 +202,11 @@ endif()
# lemon
cat("${TD_SUPPORT_DIR}/lemon_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
# Force specify CC=cc on MacOS. Because the default CC setting in the generated Makefile has issues finding standard library headers
IF(${TD_DARWIN})
SET(CONTRIB_CONFIG_ENV "CC=cc")
ENDIF()
# download dependencies
configure_file(${CONTRIB_TMP_FILE} "${TD_CONTRIB_DIR}/deps-download/CMakeLists.txt")
execute_process(COMMAND "${CMAKE_COMMAND}" -G "${CMAKE_GENERATOR}" .

View File

@ -160,7 +160,7 @@ typedef enum EStreamType {
STREAM_PARTITION_DELETE_DATA,
STREAM_GET_RESULT,
STREAM_DROP_CHILD_TABLE,
STREAM_EVENT_OPEN_WINDOW,
STREAM_NOTIFY_EVENT,
} EStreamType;
#pragma pack(push, 1)
@ -409,6 +409,9 @@ typedef struct STUidTagInfo {
#define UD_GROUPID_COLUMN_INDEX 1
#define UD_TAG_COLUMN_INDEX 2
// stream notify event block column
#define NOTIFY_EVENT_STR_COLUMN_INDEX 0
int32_t taosGenCrashJsonMsg(int signum, char** pMsg, int64_t clusterId, int64_t startTime);
int32_t dumpConfToDataBlock(SSDataBlock* pBlock, int32_t startCol);

View File

@ -285,6 +285,8 @@ bool isAutoTableName(char* ctbName);
int32_t buildCtbNameAddGroupId(const char* stbName, char* ctbName, uint64_t groupId, size_t cap);
int32_t buildCtbNameByGroupId(const char* stbName, uint64_t groupId, char** pName);
int32_t buildCtbNameByGroupIdImpl(const char* stbName, uint64_t groupId, char* pBuf);
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName);
int32_t trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList);

View File

@ -269,6 +269,7 @@ typedef enum ENodeType {
QUERY_NODE_TSMA_OPTIONS,
QUERY_NODE_ANOMALY_WINDOW,
QUERY_NODE_RANGE_AROUND,
QUERY_NODE_STREAM_NOTIFY_OPTIONS,
// Statement nodes are used in parser and planner module.
QUERY_NODE_SET_OPERATOR = 100,
@ -2956,6 +2957,11 @@ typedef struct {
// 3.3.0.0
SArray* pCols; // array of SField
int64_t smaId;
// 3.3.6.0
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
int8_t notifyHistory;
} SCMCreateStreamReq;
typedef struct {

View File

@ -98,6 +98,9 @@ int32_t qSetTaskId(qTaskInfo_t tinfo, uint64_t taskId, uint64_t queryId);
int32_t qSetStreamOpOpen(qTaskInfo_t tinfo);
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule);
/**
* Set multiple input data blocks for the stream scan.
* @param tinfo

View File

@ -566,19 +566,44 @@ typedef struct SStreamOptions {
int64_t setFlag;
} SStreamOptions;
typedef enum EStreamNotifyOptionSetFlag {
SNOTIFY_OPT_ERROR_HANDLE_SET = BIT_FLAG_MASK(0),
SNOTIFY_OPT_NOTIFY_HISTORY_SET = BIT_FLAG_MASK(1),
} EStreamNotifyOptionSetFlag;
typedef enum EStreamNotifyEventType {
SNOTIFY_EVENT_WINDOW_OPEN = BIT_FLAG_MASK(0),
SNOTIFY_EVENT_WINDOW_CLOSE = BIT_FLAG_MASK(1),
} EStreamNotifyEventType;
typedef enum EStreamNotifyErrorHandleType {
SNOTIFY_ERROR_HANDLE_PAUSE,
SNOTIFY_ERROR_HANDLE_DROP,
} EStreamNotifyErrorHandleType;
typedef struct SStreamNotifyOptions {
ENodeType type;
SNodeList* pAddrUrls;
EStreamNotifyEventType eventTypes;
EStreamNotifyErrorHandleType errorHandle;
bool notifyHistory;
EStreamNotifyOptionSetFlag setFlag;
} SStreamNotifyOptions;
typedef struct SCreateStreamStmt {
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SCMCreateStreamReq* pReq;
ENodeType type;
char streamName[TSDB_TABLE_NAME_LEN];
char targetDbName[TSDB_DB_NAME_LEN];
char targetTabName[TSDB_TABLE_NAME_LEN];
bool ignoreExists;
SStreamOptions* pOptions;
SNode* pQuery;
SNode* pPrevQuery;
SNodeList* pTags;
SNode* pSubtable;
SNodeList* pCols;
SStreamNotifyOptions* pNotifyOptions;
SCMCreateStreamReq* pReq;
} SCreateStreamStmt;
typedef struct SDropStreamStmt {

View File

@ -65,10 +65,14 @@ typedef struct SStreamTaskSM SStreamTaskSM;
typedef struct SStreamQueueItem SStreamQueueItem;
typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_VER 4
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
#define SSTREAM_TASK_VER 5
#define SSTREAM_TASK_INCOMPATIBLE_VER 1
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 // Append subtable name with groupId
#define SSTREAM_TASK_APPEND_STABLE_NAME_VER 4 // Append subtable name with stableName and groupId
#define SSTREAM_TASK_ADD_NOTIFY_VER 5 // Support event notification at window open/close
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
extern int32_t streamMetaRefPool;
extern int32_t streamTaskRefPool;
@ -427,6 +431,15 @@ typedef struct STaskCheckInfo {
TdThreadMutex checkInfoLock;
} STaskCheckInfo;
typedef struct SNotifyInfo {
SArray* pNotifyAddrUrls;
int32_t notifyEventTypes;
int32_t notifyErrorHandle;
char* streamName;
char* stbFullName;
SSchemaWrapper* pSchemaWrapper;
} SNotifyInfo;
struct SStreamTask {
int64_t ver;
SStreamTaskId id;
@ -449,6 +462,7 @@ struct SStreamTask {
SStreamState* pState; // state backend
SUpstreamInfo upstreamInfo;
STaskCheckInfo taskCheckInfo;
SNotifyInfo notifyInfo;
// the followings attributes don't be serialized
SScanhistorySchedInfo schedHistoryInfo;

View File

@ -245,6 +245,7 @@ typedef enum ELogicConditionType {
#define TSDB_OFFSET_LEN 64 // it is a null-terminated string
#define TSDB_USER_CGROUP_LEN (TSDB_USER_LEN + TSDB_CGROUP_LEN) // it is a null-terminated string
#define TSDB_STREAM_NAME_LEN 193 // it is a null-terminated string
#define TSDB_STREAM_NOTIFY_URL_LEN 128 // it includes the terminating '\0'
#define TSDB_DB_NAME_LEN 65
#define TSDB_DB_FNAME_LEN (TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN)
#define TSDB_PRIVILEDGE_CONDITION_LEN 48 * 1024

View File

@ -79,6 +79,9 @@ void taosResetLog();
void taosDumpData(uint8_t *msg, int32_t len);
void taosSetNoNewFile();
// Fast uint64_t to string conversion, equivalent to sprintf(buf, "%lu", val) but with 10x better performance.
char *u64toaFastLut(uint64_t val, char *buf);
void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...)
#ifdef __GNUC__
__attribute__((format(printf, 4, 5)))

View File

@ -54,6 +54,23 @@ target_link_libraries(
INTERFACE api
)
if(NOT ${TD_WINDOWS})
target_include_directories(
common
PUBLIC "$ENV{HOME}/.cos-local.2/include"
)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
target_link_libraries(
common
PUBLIC ${CURL_LIBRARY}
PUBLIC ${SSL_LIBRARY}
PUBLIC ${CRYPTO_LIBRARY}
)
endif()
if(${BUILD_S3})
if(${BUILD_WITH_S3})
target_include_directories(
@ -65,9 +82,6 @@ if(${BUILD_S3})
set(CMAKE_FIND_LIBRARY_SUFFIXES ".a")
set(CMAKE_PREFIX_PATH $ENV{HOME}/.cos-local.2)
find_library(S3_LIBRARY s3)
find_library(CURL_LIBRARY curl $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(SSL_LIBRARY ssl $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
find_library(CRYPTO_LIBRARY crypto $ENV{HOME}/.cos-local.2/lib64 $ENV{HOME}/.cos-local.2/lib NO_DEFAULT_PATH)
target_link_libraries(
common
@ -87,7 +101,6 @@ if(${BUILD_S3})
find_library(APR_LIBRARY apr-1 PATHS /usr/local/apr/lib/)
find_library(APR_UTIL_LIBRARY aprutil-1 PATHS /usr/local/apr/lib/)
find_library(MINIXML_LIBRARY mxml)
find_library(CURL_LIBRARY curl)
target_link_libraries(
common

View File

@ -9959,6 +9959,16 @@ int32_t tSerializeSCMCreateStreamReq(void *buf, int32_t bufLen, const SCMCreateS
}
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->smaId));
int32_t addrSize = taosArrayGetSize(pReq->pNotifyAddrUrls);
TAOS_CHECK_EXIT(tEncodeI32(&encoder, addrSize));
for (int32_t i = 0; i < addrSize; ++i) {
const char *url = taosArrayGetP(pReq->pNotifyAddrUrls, i);
TAOS_CHECK_EXIT((tEncodeCStr(&encoder, url)));
}
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->notifyHistory));
tEndEncode(&encoder);
_exit:
@ -10093,6 +10103,30 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->smaId));
}
if (!tDecodeIsEnd(&decoder)) {
int32_t addrSize = 0;
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &addrSize));
pReq->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
if (pReq->pNotifyAddrUrls == NULL) {
TAOS_CHECK_EXIT(terrno);
}
for (int32_t i = 0; i < addrSize; ++i) {
char *url = NULL;
TAOS_CHECK_EXIT(tDecodeCStr(&decoder, &url));
url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
if (url == NULL) {
TAOS_CHECK_EXIT(terrno);
}
if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
taosMemoryFree(url);
TAOS_CHECK_EXIT(terrno);
}
}
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyEventTypes));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->notifyErrorHandle));
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->notifyHistory));
}
tEndDecode(&decoder);
_exit:
tDecoderClear(&decoder);
@ -10155,6 +10189,7 @@ void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
taosArrayDestroy(pReq->fillNullCols);
taosArrayDestroy(pReq->pVgroupVerList);
taosArrayDestroy(pReq->pCols);
taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
}
int32_t tEncodeSRSmaParam(SEncoder *pCoder, const SRSmaParam *pRSmaParam) {

View File

@ -3061,6 +3061,33 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return code;
}
int32_t buildSinkDestTableName(char* parTbName, const char* stbFullName, uint64_t gid, bool newSubTableRule,
char** dstTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (parTbName[0]) {
if (newSubTableRule && !isAutoTableName(parTbName) && !alreadyAddGroupId(parTbName, gid) && gid != 0 &&
stbFullName) {
*dstTableName = taosMemoryCalloc(1, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
tstrncpy(*dstTableName, parTbName, TSDB_TABLE_NAME_LEN);
code = buildCtbNameAddGroupId(stbFullName, *dstTableName, gid, TSDB_TABLE_NAME_LEN);
TSDB_CHECK_CODE(code, lino, _end);
} else {
*dstTableName = taosStrdup(parTbName);
TSDB_CHECK_NULL(*dstTableName, code, lino, _end, terrno);
}
} else {
code = buildCtbNameByGroupId(stbFullName, gid, dstTableName);
TSDB_CHECK_CODE(code, lino, _end);
}
_end:
return code;
}
// return length of encoded data, return -1 if failed
int32_t blockEncode(const SSDataBlock* pBlock, char* data, size_t dataBuflen, int32_t numOfCols) {
int32_t code = blockDataCheck(pBlock);

View File

@ -753,6 +753,77 @@ static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
return TSDB_CODE_SUCCESS;
}
static void *notifyAddrDup(void *p) { return taosStrdup((char *)p); }
static int32_t addStreamTaskNotifyInfo(const SCMCreateStreamReq *createReq, const SStreamObj *pStream,
SStreamTask *pTask) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
pTask->notifyInfo.pNotifyAddrUrls = taosArrayDup(createReq->pNotifyAddrUrls, notifyAddrDup);
TSDB_CHECK_NULL(pTask->notifyInfo.pNotifyAddrUrls, code, lino, _end, terrno);
pTask->notifyInfo.notifyEventTypes = createReq->notifyEventTypes;
pTask->notifyInfo.notifyErrorHandle = createReq->notifyErrorHandle;
pTask->notifyInfo.streamName = taosStrdup(createReq->name);
TSDB_CHECK_NULL(pTask->notifyInfo.streamName, code, lino, _end, terrno);
pTask->notifyInfo.stbFullName = taosStrdup(createReq->targetStbFullName);
TSDB_CHECK_NULL(pTask->notifyInfo.stbFullName, code, lino, _end, terrno);
pTask->notifyInfo.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema);
TSDB_CHECK_NULL(pTask->notifyInfo.pSchemaWrapper, code, lino, _end, terrno);
_end:
if (code != TSDB_CODE_SUCCESS) {
mError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t addStreamNotifyInfo(SCMCreateStreamReq *createReq, SStreamObj *pStream) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t level = 0;
int32_t nTasks = 0;
SArray *pLevel = NULL;
TSDB_CHECK_NULL(createReq, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pStream, code, lino, _end, TSDB_CODE_INVALID_PARA);
if (taosArrayGetSize(createReq->pNotifyAddrUrls) == 0) {
goto _end;
}
level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; ++i) {
pLevel = taosArrayGetP(pStream->tasks, i);
nTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < nTasks; ++j) {
code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
TSDB_CHECK_CODE(code, lino, _end);
}
}
if (pStream->conf.fillHistory && createReq->notifyHistory) {
level = taosArrayGetSize(pStream->pHTasksList);
for (int32_t i = 0; i < level; ++i) {
pLevel = taosArrayGetP(pStream->pHTasksList, i);
nTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < nTasks; ++j) {
code = addStreamTaskNotifyInfo(createReq, pStream, taosArrayGetP(pLevel, j));
TSDB_CHECK_CODE(code, lino, _end);
}
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
mError("%s for stream %s failed at line %d since %s", __func__, pStream->name, lino, tstrerror(code));
}
return code;
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
@ -850,6 +921,14 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
// add notify info into all stream tasks
code = addStreamNotifyInfo(&createReq, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
mError("stream:%s failed to add stream notify info since %s", createReq.name, tstrerror(code));
mndTransDrop(pTrans);
goto _OVER;
}
// add stream to trans
code = mndPersistStream(pTrans, &streamObj);
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -75,6 +75,7 @@ set(
"src/tq/tqSnapshot.c"
"src/tq/tqStreamStateSnap.c"
"src/tq/tqStreamTaskSnap.c"
"src/tq/tqStreamNotify.c"
)
aux_source_directory("src/tsdb/" TSDB_SOURCE_FILES)

View File

@ -159,6 +159,11 @@ int32_t buildAutoCreateTableReq(const char* stbFullName, int64_t suid, int32_t n
SArray* pTagArray, bool newSubTableRule, SVCreateTbReq** pReq);
int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, void** pRefBlock, int32_t type);
// tq send notifications
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap);
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode);
#define TQ_ERR_GO_TO_END(c) \
do { \
code = c; \

View File

@ -81,6 +81,8 @@ typedef struct SCommitInfo SCommitInfo;
typedef struct SCompactInfo SCompactInfo;
typedef struct SQueryNode SQueryNode;
typedef struct SStreamNotifyHandleMap SStreamNotifyHandleMap;
#define VNODE_META_TMP_DIR "meta.tmp"
#define VNODE_META_BACKUP_DIR "meta.backup"
@ -496,6 +498,9 @@ struct SVnode {
int64_t blockSeq;
SQHandle* pQuery;
SVMonitorObj monitor;
// Notification Handles
SStreamNotifyHandleMap* pNotifyHandleMap;
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)

View File

@ -16,8 +16,6 @@
#include "tcommon.h"
#include "tq.h"
#define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1))
typedef struct STableSinkInfo {
uint64_t uid;
tstr name;
@ -983,7 +981,7 @@ int32_t setDstTableDataUid(SVnode* pVnode, SStreamTask* pTask, SSDataBlock* pDat
tqDebug("s-task:%s append groupId:%" PRId64 " for generated dstTable:%s", id, groupId, dstTableName);
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER && stbFullName) {
} else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER && stbFullName) {
code = buildCtbNameAddGroupId(stbFullName, dstTableName, groupId, sizeof(pDataBlock->info.parTbName));
}
if (code != TSDB_CODE_SUCCESS) {
@ -1150,6 +1148,12 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
return;
}
code = tqSendAllNotifyEvents(pBlocks, pTask, pVnode);
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId: %d, s-task:%s failed to send all event notifications", vgId, id);
// continue processing even if notification fails
}
bool onlySubmitData = hasOnlySubmitData(pBlocks, numOfBlocks);
if (!onlySubmitData || pTask->subtableWithoutMd5 == 1) {
tqDebug("vgId:%d, s-task:%s write %d stream resBlock(s) into table, has delete block, submit one-by-one", vgId, id,
@ -1173,6 +1177,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
continue;
} else if (pDataBlock->info.type == STREAM_DROP_CHILD_TABLE && pTask->subtableWithoutMd5) {
code = doBuildAndSendDropTableMsg(pVnode, stbFullName, pDataBlock, pTask, suid);
} else if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
continue;
} else {
code = handleResultBlockMsg(pTask, pDataBlock, i, pVnode, earlyTs);
}
@ -1317,6 +1323,10 @@ void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVno
continue;
}
if (pDataBlock->info.type == STREAM_NOTIFY_EVENT) {
continue;
}
hasSubmit = true;
pTask->execInfo.sink.numOfBlocks += 1;
uint64_t groupId = pDataBlock->info.id.groupId;

View File

@ -0,0 +1,445 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "cmdnodes.h"
#include "tq.h"
#ifndef WINDOWS
#include "curl/curl.h"
#endif
#define STREAM_EVENT_NOTIFY_RETRY_MS 50 // 50ms
typedef struct SStreamNotifyHandle {
TdThreadMutex mutex;
#ifndef WINDOWS
CURL* curl;
#endif
char* url;
} SStreamNotifyHandle;
struct SStreamNotifyHandleMap {
TdThreadMutex gMutex;
SHashObj* handleMap;
};
static void stopStreamNotifyConn(SStreamNotifyHandle* pHandle) {
#ifndef WINDOWS
if (pHandle == NULL || pHandle->curl == NULL) {
return;
}
// status code 1000 means normal closure
size_t len = 0;
uint16_t status = htons(1000);
CURLcode res = curl_ws_send(pHandle->curl, &status, sizeof(status), &len, 0, CURLWS_CLOSE);
if (res != CURLE_OK) {
tqWarn("failed to send ws-close msg to %s for %d", pHandle->url ? pHandle->url : "", res);
}
// TODO: add wait mechanism for peer connection close response
curl_easy_cleanup(pHandle->curl);
#endif
}
static void destroyStreamNotifyHandle(void* ptr) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamNotifyHandle** ppHandle = ptr;
if (ppHandle == NULL || *ppHandle == NULL) {
return;
}
code = taosThreadMutexDestroy(&(*ppHandle)->mutex);
stopStreamNotifyConn(*ppHandle);
taosMemoryFreeClear((*ppHandle)->url);
taosMemoryFreeClear(*ppHandle);
}
static void releaseStreamNotifyHandle(SStreamNotifyHandle** ppHandle) {
if (ppHandle == NULL || *ppHandle == NULL) {
return;
}
(void)taosThreadMutexUnlock(&(*ppHandle)->mutex);
*ppHandle = NULL;
}
static int32_t acquireStreamNotifyHandle(SStreamNotifyHandleMap* pMap, const char* url,
SStreamNotifyHandle** ppHandle) {
#ifndef WINDOWS
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
bool gLocked = false;
SStreamNotifyHandle** ppFindHandle = NULL;
SStreamNotifyHandle* pNewHandle = NULL;
CURL* newCurl = NULL;
CURLcode res = CURLE_OK;
TSDB_CHECK_NULL(pMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(url, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(ppHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
*ppHandle = NULL;
code = taosThreadMutexLock(&pMap->gMutex);
TSDB_CHECK_CODE(code, lino, _end);
gLocked = true;
ppFindHandle = taosHashGet(pMap->handleMap, url, strlen(url));
if (ppFindHandle == NULL) {
pNewHandle = taosMemoryCalloc(1, sizeof(SStreamNotifyHandle));
TSDB_CHECK_NULL(pNewHandle, code, lino, _end, terrno);
code = taosThreadMutexInit(&pNewHandle->mutex, NULL);
TSDB_CHECK_CODE(code, lino, _end);
code = taosHashPut(pMap->handleMap, url, strlen(url), &pNewHandle, POINTER_BYTES);
TSDB_CHECK_CODE(code, lino, _end);
*ppHandle = pNewHandle;
pNewHandle = NULL;
} else {
*ppHandle = *ppFindHandle;
}
code = taosThreadMutexLock(&(*ppHandle)->mutex);
TSDB_CHECK_CODE(code, lino, _end);
(void)taosThreadMutexUnlock(&pMap->gMutex);
gLocked = false;
if ((*ppHandle)->curl == NULL) {
newCurl = curl_easy_init();
TSDB_CHECK_NULL(newCurl, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_setopt(newCurl, CURLOPT_URL, url);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYPEER, 0L);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_setopt(newCurl, CURLOPT_SSL_VERIFYHOST, 0L);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_setopt(newCurl, CURLOPT_TIMEOUT, 3L);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_setopt(newCurl, CURLOPT_CONNECT_ONLY, 2L);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
res = curl_easy_perform(newCurl);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
(*ppHandle)->curl = newCurl;
newCurl = NULL;
}
if ((*ppHandle)->url == NULL) {
(*ppHandle)->url = taosStrdup(url);
TSDB_CHECK_NULL((*ppHandle)->url, code, lino, _end, terrno);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
if (*ppHandle) {
releaseStreamNotifyHandle(ppHandle);
}
*ppHandle = NULL;
}
if (newCurl) {
curl_easy_cleanup(newCurl);
}
if (pNewHandle) {
destroyStreamNotifyHandle(&pNewHandle);
}
if (gLocked) {
(void)taosThreadMutexUnlock(&pMap->gMutex);
}
return code;
#else
tqError("stream notify events is not supported on windows");
return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
#endif
}
int32_t tqInitNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamNotifyHandleMap* pMap = NULL;
TSDB_CHECK_NULL(ppMap, code, lino, _end, TSDB_CODE_INVALID_PARA);
*ppMap = NULL;
pMap = taosMemoryCalloc(1, sizeof(SStreamNotifyHandleMap));
TSDB_CHECK_NULL(pMap, code, lino, _end, terrno);
code = taosThreadMutexInit(&pMap->gMutex, NULL);
TSDB_CHECK_CODE(code, lino, _end);
pMap->handleMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
TSDB_CHECK_NULL(pMap->handleMap, code, lino, _end, terrno);
taosHashSetFreeFp(pMap->handleMap, destroyStreamNotifyHandle);
*ppMap = pMap;
pMap = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (pMap != NULL) {
tqDestroyNotifyHandleMap(&pMap);
}
return code;
}
void tqDestroyNotifyHandleMap(SStreamNotifyHandleMap** ppMap) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (*ppMap == NULL) {
return;
}
taosHashCleanup((*ppMap)->handleMap);
code = taosThreadMutexDestroy(&(*ppMap)->gMutex);
taosMemoryFreeClear((*ppMap));
}
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
TSDB_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
static int32_t getStreamNotifyEventHeader(const char* streamName, char** pHeader) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON* obj = NULL;
cJSON* streams = NULL;
cJSON* stream = NULL;
char msgId[37];
TSDB_CHECK_NULL(streamName, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHeader, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pHeader = NULL;
code = taosGetSystemUUIDLimit36(msgId, sizeof(msgId));
TSDB_CHECK_CODE(code, lino, _end);
stream = cJSON_CreateObject();
TSDB_CHECK_NULL(stream, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
JSON_CHECK_ADD_ITEM(stream, "streamName", cJSON_CreateStringReference(streamName));
JSON_CHECK_ADD_ITEM(stream, "events", cJSON_CreateArray());
streams = cJSON_CreateArray();
TSDB_CHECK_CONDITION(cJSON_AddItemToArray(streams, stream), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
stream = NULL;
obj = cJSON_CreateObject();
TSDB_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
JSON_CHECK_ADD_ITEM(obj, "messageId", cJSON_CreateStringReference(msgId));
JSON_CHECK_ADD_ITEM(obj, "timestamp", cJSON_CreateNumber(taosGetTimestampMs()));
JSON_CHECK_ADD_ITEM(obj, "streams", streams);
streams = NULL;
*pHeader = cJSON_PrintUnformatted(obj);
TSDB_CHECK_NULL(*pHeader, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (stream != NULL) {
cJSON_Delete(stream);
}
if (streams != NULL) {
cJSON_Delete(streams);
}
if (obj != NULL) {
cJSON_Delete(obj);
}
return code;
}
static int32_t packupStreamNotifyEvent(const char* streamName, const SArray* pBlocks, char** pMsg,
int32_t* nNotifyEvents) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t numOfBlocks = 0;
int32_t msgHeaderLen = 0;
int32_t msgTailLen = 0;
int32_t msgLen = 0;
char* msgHeader = NULL;
const char* msgTail = "]}]}";
char* msg = NULL;
TSDB_CHECK_NULL(pMsg, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pMsg = NULL;
numOfBlocks = taosArrayGetSize(pBlocks);
*nNotifyEvents = 0;
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
continue;
}
SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
char* val = colDataGetVarData(pEventStrCol, j);
msgLen += varDataLen(val) + 1;
}
*nNotifyEvents += pDataBlock->info.rows;
}
if (msgLen == 0) {
// skip since no notification events found
goto _end;
}
code = getStreamNotifyEventHeader(streamName, &msgHeader);
TSDB_CHECK_CODE(code, lino, _end);
msgHeaderLen = strlen(msgHeader);
msgTailLen = strlen(msgTail);
msgLen += msgHeaderLen;
msg = taosMemoryMalloc(msgLen);
TSDB_CHECK_NULL(msg, code, lino, _end, terrno);
char* p = msg;
TAOS_STRNCPY(p, msgHeader, msgHeaderLen);
p += msgHeaderLen - msgTailLen;
for (int32_t i = 0; i < numOfBlocks; ++i) {
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock == NULL || pDataBlock->info.type != STREAM_NOTIFY_EVENT) {
continue;
}
SColumnInfoData* pEventStrCol = taosArrayGet(pDataBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
for (int32_t j = 0; j < pDataBlock->info.rows; ++j) {
char* val = colDataGetVarData(pEventStrCol, j);
TAOS_STRNCPY(p, varDataVal(val), varDataLen(val));
p += varDataLen(val);
*(p++) = ',';
}
}
p -= 1;
TAOS_STRNCPY(p, msgTail, msgTailLen);
*(p + msgTailLen) = '\0';
*pMsg = msg;
msg = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (msgHeader != NULL) {
cJSON_free(msgHeader);
}
if (msg != NULL) {
taosMemoryFreeClear(msg);
}
return code;
}
static int32_t sendSingleStreamNotify(SStreamNotifyHandle* pHandle, char* msg) {
#ifndef WINDOWS
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
CURLcode res = CURLE_OK;
uint64_t sentLen = 0;
uint64_t totalLen = 0;
size_t nbytes = 0;
TSDB_CHECK_NULL(pHandle, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pHandle->curl, code, lino, _end, TSDB_CODE_INVALID_PARA);
totalLen = strlen(msg);
while (sentLen < totalLen) {
res = curl_ws_send(pHandle->curl, msg + sentLen, totalLen - sentLen, &nbytes, 0, CURLWS_TEXT);
TSDB_CHECK_CONDITION(res == CURLE_OK, code, lino, _end, TSDB_CODE_FAILED);
sentLen += nbytes;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %d, %s", __func__, lino, res, tstrerror(code));
stopStreamNotifyConn(pHandle);
}
return code;
#else
tqError("stream notify events is not supported on windows");
return TSDB_CODE_NOT_SUPPORTTED_IN_WINDOWS;
#endif
}
int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* pVnode) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
char* msg = NULL;
int32_t nNotifyAddr = 0;
int32_t nNotifyEvents = 0;
SStreamNotifyHandle* pHandle = NULL;
TSDB_CHECK_NULL(pTask, code, lino, _end, TSDB_CODE_INVALID_PARA);
TSDB_CHECK_NULL(pVnode, code, lino, _end, TSDB_CODE_INVALID_PARA);
nNotifyAddr = taosArrayGetSize(pTask->notifyInfo.pNotifyAddrUrls);
if (nNotifyAddr == 0) {
goto _end;
}
code = packupStreamNotifyEvent(pTask->notifyInfo.streamName, pBlocks, &msg, &nNotifyEvents);
TSDB_CHECK_CODE(code, lino, _end);
if (msg == NULL) {
goto _end;
}
tqDebug("stream task %s prepare to send %d notify events, total msg length: %" PRIu64, pTask->notifyInfo.streamName,
nNotifyEvents, (uint64_t)strlen(msg));
for (int32_t i = 0; i < nNotifyAddr; ++i) {
if (streamTaskShouldStop(pTask)) {
break;
}
const char* url = taosArrayGetP(pTask->notifyInfo.pNotifyAddrUrls, i);
code = acquireStreamNotifyHandle(pVnode->pNotifyHandleMap, url, &pHandle);
if (code != TSDB_CODE_SUCCESS) {
tqError("failed to get stream notify handle of %s", url);
if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
// retry for event message sending in PAUSE error handling mode
taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
--i;
continue;
} else {
// simply ignore the failure in DROP error handling mode
code = TSDB_CODE_SUCCESS;
continue;
}
}
code = sendSingleStreamNotify(pHandle, msg);
if (code != TSDB_CODE_SUCCESS) {
tqError("failed to send stream notify handle to %s since %s", url, tstrerror(code));
if (pTask->notifyInfo.notifyErrorHandle == SNOTIFY_ERROR_HANDLE_PAUSE) {
// retry for event message sending in PAUSE error handling mode
taosMsleep(STREAM_EVENT_NOTIFY_RETRY_MS);
--i;
} else {
// simply ignore the failure in DROP error handling mode
code = TSDB_CODE_SUCCESS;
}
} else {
tqDebug("stream task %s send %d notify events to %s successfully", pTask->notifyInfo.streamName, nNotifyEvents,
url);
}
releaseStreamNotifyHandle(&pHandle);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
tqError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (msg) {
taosMemoryFreeClear(msg);
}
return code;
}

View File

@ -86,6 +86,14 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) {
if (code) {
return code;
}
code =
qSetStreamNotifyInfo(pTask->exec.pExecutor, pTask->notifyInfo.notifyEventTypes,
pTask->notifyInfo.pSchemaWrapper, pTask->notifyInfo.stbFullName, IS_NEW_SUBTB_RULE(pTask));
if (code) {
tqError("s-task:%s failed to set stream notify info, code:%s", pTask->id.idStr, tstrerror(code));
return code;
}
}
streamSetupScheduleTrigger(pTask);
@ -1357,4 +1365,4 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask);
return 0;
}
}

View File

@ -15,6 +15,7 @@
#include "sync.h"
#include "tcs.h"
#include "tq.h"
#include "tsdb.h"
#include "vnd.h"
@ -483,6 +484,14 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
ret = taosRealPath(tdir, NULL, sizeof(tdir));
TAOS_UNUSED(ret);
// init handle map for stream event notification
ret = tqInitNotifyHandleMap(&pVnode->pNotifyHandleMap);
if (ret != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to init StreamNotifyHandleMap", TD_VID(pVnode));
terrno = ret;
goto _err;
}
// open query
vInfo("vgId:%d, start to open vnode query", TD_VID(pVnode));
if (vnodeQueryOpen(pVnode)) {
@ -555,6 +564,7 @@ void vnodeClose(SVnode *pVnode) {
vnodeAWait(&pVnode->commitTask);
vnodeSyncClose(pVnode);
vnodeQueryClose(pVnode);
tqDestroyNotifyHandleMap(&pVnode->pNotifyHandleMap);
tqClose(pVnode->pTq);
walClose(pVnode->pWal);
if (pVnode->pTsdb) tsdbClose(&pVnode->pTsdb);

View File

@ -449,11 +449,17 @@ typedef struct STimeWindowAggSupp {
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
} STimeWindowAggSupp;
typedef struct SStreamNotifyEventSupp {
SArray* pWindowEvents; // Array of SStreamNotifyEvent, storing window events and trigger values.
SHashObj* pTableNameHashMap; // Hash map from groupid to the dest child table name.
SHashObj* pResultHashMap; // Hash map from groupid+skey to the window agg result.
SSDataBlock* pEventBlock; // The datablock contains all window events and results.
} SStreamNotifyEventSupp;
typedef struct SSteamOpBasicInfo {
int32_t primaryPkIndex;
bool updateOperatorInfo;
SSDataBlock* pEventRes;
SArray* pEventInfo;
int32_t primaryPkIndex;
bool updateOperatorInfo;
SStreamNotifyEventSupp windowEventSup;
} SSteamOpBasicInfo;
typedef struct SStreamFillSupporter {
@ -769,6 +775,8 @@ typedef struct SStreamEventAggOperatorInfo {
SSHashObj* pPkDeleted;
bool destHasPrimaryKey;
struct SOperatorInfo* pOperator;
SNodeList* pStartCondCols;
SNodeList* pEndCondCols;
} SStreamEventAggOperatorInfo;
typedef struct SStreamCountAggOperatorInfo {

View File

@ -71,6 +71,10 @@ typedef struct {
SVersionRange fillHistoryVer;
STimeWindow fillHistoryWindow;
SStreamState* pState;
int32_t eventTypes; // event types to notify
SSchemaWrapper* notifyResultSchema; // agg result to notify
char* stbFullName; // used to generate dest child table name
bool newSubTableRule; // used to generate dest child table name
} SStreamTaskInfo;
struct SExecTaskInfo {

View File

@ -19,7 +19,10 @@
extern "C" {
#endif
#include "cJSON.h"
#include "cmdnodes.h"
#include "executorInt.h"
#include "querytask.h"
#include "tutil.h"
#define FILL_POS_INVALID 0
@ -107,6 +110,13 @@ int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY
int32_t initOffsetInfo(int32_t** ppOffset, SSDataBlock* pRes);
TSKEY compareTs(void* pKey);
int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
SStreamNotifyEventSupp* sup);
int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
SStreamNotifyEventSupp* sup);
int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup);
#ifdef __cplusplus
}
#endif

View File

@ -250,6 +250,28 @@ int32_t qSetStreamOpOpen(qTaskInfo_t tinfo) {
return code;
}
int32_t qSetStreamNotifyInfo(qTaskInfo_t tinfo, int32_t eventTypes, const SSchemaWrapper* pSchemaWrapper,
const char* stbFullName, bool newSubTableRule) {
int32_t code = TSDB_CODE_SUCCESS;
SStreamTaskInfo *pStreamInfo = NULL;
if (tinfo == 0 || eventTypes == 0 || pSchemaWrapper == NULL || stbFullName == NULL) {
goto _end;
}
pStreamInfo = &((SExecTaskInfo*)tinfo)->streamInfo;
pStreamInfo->eventTypes = eventTypes;
pStreamInfo->notifyResultSchema = tCloneSSchemaWrapper(pSchemaWrapper);
if (pStreamInfo->notifyResultSchema == NULL) {
code = terrno;
}
pStreamInfo->stbFullName = taosStrdup(stbFullName);
pStreamInfo->newSubTableRule = newSubTableRule;
_end:
return code;
}
int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, int32_t type) {
if (tinfo == NULL) {
return TSDB_CODE_APP_ERROR;

View File

@ -262,6 +262,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
static void cleanupStreamInfo(SStreamTaskInfo* pStreamInfo) {
tDeleteSchemaWrapper(pStreamInfo->schema);
tOffsetDestroy(&pStreamInfo->currentOffset);
tDeleteSchemaWrapper(pStreamInfo->notifyResultSchema);
taosMemoryFree(pStreamInfo->stbFullName);
}
static void freeBlock(void* pParam) {

View File

@ -93,6 +93,16 @@ void destroyStreamEventOperatorInfo(void* param) {
pInfo->pEndCondInfo = NULL;
}
if (pInfo->pStartCondCols != NULL) {
nodesDestroyList(pInfo->pStartCondCols);
pInfo->pStartCondCols = NULL;
}
if (pInfo->pEndCondCols != NULL) {
nodesDestroyList(pInfo->pEndCondCols);
pInfo->pEndCondCols = NULL;
}
taosMemoryFreeClear(param);
}
@ -310,14 +320,6 @@ void doDeleteEventWindow(SStreamAggSupporter* pAggSup, SSHashObj* pSeUpdated, SS
removeSessionResult(pAggSup, pSeUpdated, pAggSup->pResultRows, pKey);
}
static int32_t setEventData(SSteamOpBasicInfo* pBasicInfo, SSessionKey* pWinKey) {
void* pRes = taosArrayPush(pBasicInfo->pEventInfo, pWinKey);
if (pRes != NULL) {
return TSDB_CODE_SUCCESS;
}
return terrno;
}
static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, SSHashObj* pSeUpdated,
SSHashObj* pStDeleted) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -393,8 +395,10 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
&nextWinKey, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) && winCode != TSDB_CODE_SUCCESS) {
code = setEventData(&pInfo->basic, &curWin.winInfo.sessionWin);
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_OPEN) &&
*(bool*)colDataGetNumData(pColStart, i) && winCode != TSDB_CODE_SUCCESS) {
code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_OPEN, &curWin.winInfo.sessionWin, pSDataBlock,
pInfo->pStartCondCols, i, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
@ -464,6 +468,12 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
QUERY_CHECK_CODE(code, lino, _end);
}
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
code = addEventAggNotifyEvent(SNOTIFY_EVENT_WINDOW_CLOSE, &curWin.winInfo.sessionWin, pSDataBlock,
pInfo->pEndCondCols, i + winRows - 1, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
}
_end:
@ -582,42 +592,13 @@ void doStreamEventSaveCheckpoint(SOperatorInfo* pOperator) {
}
}
static void buildEventNotifyResult(SSteamOpBasicInfo* pBasicInfo) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
blockDataCleanup(pBasicInfo->pEventRes);
int32_t size = taosArrayGetSize(pBasicInfo->pEventInfo);
code = blockDataEnsureCapacity(pBasicInfo->pEventRes, size);
QUERY_CHECK_CODE(code, lino, _end);
for (int32_t i = 0; i < size; i++) {
SSessionKey* pKey = taosArrayGet(pBasicInfo->pEventInfo, i);
uint64_t uid = 0;
code = appendDataToSpecialBlock(pBasicInfo->pEventRes, &pKey->win.skey, &pKey->win.ekey, &uid, &pKey->groupId, NULL);
QUERY_CHECK_CODE(code, lino, _end);
}
taosArrayClear(pBasicInfo->pEventInfo);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s.", __func__, lino, tstrerror(code));
}
}
static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SStreamEventAggOperatorInfo* pInfo = pOperator->info;
SOptrBasicInfo* pBInfo = &pInfo->binfo;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
buildEventNotifyResult(&pInfo->basic);
if (pInfo->basic.pEventRes->info.rows > 0) {
printDataBlock(pInfo->basic.pEventRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->basic.pEventRes;
return code;
}
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
@ -628,10 +609,27 @@ static int32_t buildEventResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
doBuildSessionResult(pOperator, pInfo->streamAggSup.pState, &pInfo->groupResInfo, pBInfo->pRes);
if (pBInfo->pRes->info.rows > 0) {
printDataBlock(pBInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
if (BIT_FLAG_TEST_MASK(pTaskInfo->streamInfo.eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE)) {
code = addAggResultNotifyEvent(pBInfo->pRes, pTaskInfo->streamInfo.notifyResultSchema, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
}
(*ppRes) = pBInfo->pRes;
return code;
}
code = buildNotifyEventBlock(pTaskInfo, &pInfo->basic.windowEventSup);
QUERY_CHECK_CODE(code, lino, _end);
if (pInfo->basic.windowEventSup.pEventBlock->info.rows > 0) {
printDataBlock(pInfo->basic.windowEventSup.pEventBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
(*ppRes) = pInfo->basic.windowEventSup.pEventBlock;
return code;
}
_end:
(*ppRes) = NULL;
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
}
return code;
}
@ -1041,6 +1039,12 @@ int32_t createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0);
QUERY_CHECK_CODE(code, lino, _error);
code =
nodesCollectColumnsFromNode((SNode*)pEventNode->pStartCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pStartCondCols);
QUERY_CHECK_CODE(code, lino, _error);
code = nodesCollectColumnsFromNode((SNode*)pEventNode->pEndCond, NULL, COLLECT_COL_TYPE_ALL, &pInfo->pEndCondCols);
QUERY_CHECK_CODE(code, lino, _error);
*pOptrInfo = pOperator;
return TSDB_CODE_SUCCESS;

View File

@ -13,9 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamexecutorInt.h"
#include "executorInt.h"
#include "tdatablock.h"
#define NOTIFY_EVENT_NAME_CACHE_LIMIT_MB 16
typedef struct SStreamNotifyEvent {
uint64_t gid;
TSKEY skey;
char* content;
bool isEnd;
} SStreamNotifyEvent;
void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type) {
if (type != STREAM_GET_ALL && type != STREAM_CHECKPOINT) {
pBasicInfo->updateOperatorInfo = true;
@ -30,19 +41,509 @@ void saveStreamOperatorStateComplete(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->updateOperatorInfo = false;
}
static void destroyStreamWindowEvent(void* ptr) {
SStreamNotifyEvent* pEvent = ptr;
if (pEvent == NULL || pEvent->content == NULL) return;
cJSON_free(pEvent->content);
}
static void destroyStreamNotifyEventSupp(SStreamNotifyEventSupp* sup) {
if (sup == NULL) return;
taosArrayDestroyEx(sup->pWindowEvents, destroyStreamWindowEvent);
taosHashCleanup(sup->pTableNameHashMap);
taosHashCleanup(sup->pResultHashMap);
blockDataDestroy(sup->pEventBlock);
*sup = (SStreamNotifyEventSupp){0};
}
static int32_t initStreamNotifyEventSupp(SStreamNotifyEventSupp *sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSDataBlock* pBlock = NULL;
SColumnInfoData infoData = {0};
if (sup == NULL) {
goto _end;
}
code = createDataBlock(&pBlock);
QUERY_CHECK_CODE(code, lino, _end);
pBlock->info.type = STREAM_NOTIFY_EVENT;
pBlock->info.watermark = INT64_MIN;
infoData.info.type = TSDB_DATA_TYPE_VARCHAR;
infoData.info.bytes = tDataTypes[infoData.info.type].bytes;
code = blockDataAppendColInfo(pBlock, &infoData);
QUERY_CHECK_CODE(code, lino, _end);
sup->pWindowEvents = taosArrayInit(0, sizeof(SStreamNotifyEvent));
QUERY_CHECK_NULL(sup->pWindowEvents, code, lino, _end, terrno);
sup->pTableNameHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
QUERY_CHECK_NULL(sup->pTableNameHashMap, code, lino, _end, terrno);
sup->pResultHashMap = taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
QUERY_CHECK_NULL(sup->pResultHashMap, code, lino, _end, terrno);
taosHashSetFreeFp(sup->pResultHashMap, destroyStreamWindowEvent);
sup->pEventBlock = pBlock;
pBlock = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (sup) {
destroyStreamNotifyEventSupp(sup);
}
}
if (pBlock != NULL) {
blockDataDestroy(pBlock);
}
return code;
}
int32_t initStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
pBasicInfo->primaryPkIndex = -1;
pBasicInfo->updateOperatorInfo = false;
pBasicInfo->pEventInfo = taosArrayInit(4, sizeof(SSessionKey));
if (pBasicInfo->pEventInfo == NULL) {
return terrno;
}
return createSpecialDataBlock(STREAM_EVENT_OPEN_WINDOW, &pBasicInfo->pEventRes);
return initStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
}
void destroyStreamBasicInfo(SSteamOpBasicInfo* pBasicInfo) {
blockDataDestroy(pBasicInfo->pEventRes);
pBasicInfo->pEventRes = NULL;
taosArrayDestroy(pBasicInfo->pEventInfo);
pBasicInfo->pEventInfo = NULL;
destroyStreamNotifyEventSupp(&pBasicInfo->windowEventSup);
}
static void streamNotifyGetEventWindowId(const SSessionKey* pSessionKey, char *buf) {
uint64_t hash = 0;
uint64_t ar[2];
ar[0] = pSessionKey->groupId;
ar[1] = pSessionKey->win.skey;
hash = MurmurHash3_64((char*)ar, sizeof(ar));
buf = u64toaFastLut(hash, buf);
}
#define JSON_CHECK_ADD_ITEM(obj, str, item) \
QUERY_CHECK_CONDITION(cJSON_AddItemToObjectCS(obj, str, item), code, lino, _end, TSDB_CODE_OUT_OF_MEMORY)
static int32_t jsonAddColumnField(const char* colName, const SColumnInfoData* pColData, int32_t ri, cJSON* obj) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
char* temp = NULL;
QUERY_CHECK_NULL(colName, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pColData, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(obj, code, lino, _end, TSDB_CODE_INVALID_PARA);
if (colDataIsNull_s(pColData, ri)) {
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNull());
goto _end;
}
switch (pColData->info.type) {
case TSDB_DATA_TYPE_BOOL: {
bool val = *(bool*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateBool(val));
break;
}
case TSDB_DATA_TYPE_TINYINT: {
int8_t val = *(int8_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_SMALLINT: {
int16_t val = *(int16_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_INT: {
int32_t val = *(int32_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t val = *(int64_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_FLOAT: {
float val = *(float*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
double val = *(double*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_VARCHAR:
case TSDB_DATA_TYPE_NCHAR: {
// cJSON requires null-terminated strings, but this data is not null-terminated,
// so we need to manually copy the string and add null termination.
const char* src = varDataVal(colDataGetVarData(pColData, ri));
int32_t len = varDataLen(colDataGetVarData(pColData, ri));
temp = cJSON_malloc(len + 1);
QUERY_CHECK_NULL(temp, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
memcpy(temp, src, len);
temp[len] = '\0';
cJSON* item = cJSON_CreateStringReference(temp);
JSON_CHECK_ADD_ITEM(obj, colName, item);
// let the cjson object to free memory later
item->type &= ~cJSON_IsReference;
temp = NULL;
break;
}
case TSDB_DATA_TYPE_UTINYINT: {
uint8_t val = *(uint8_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_USMALLINT: {
uint16_t val = *(uint16_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_UINT: {
uint32_t val = *(uint32_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
case TSDB_DATA_TYPE_UBIGINT: {
uint64_t val = *(uint64_t*)colDataGetNumData(pColData, ri);
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateNumber(val));
break;
}
default: {
JSON_CHECK_ADD_ITEM(obj, colName, cJSON_CreateStringReference("<Unable to display this data type>"));
break;
}
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (temp) {
cJSON_free(temp);
}
return code;
}
int32_t addEventAggNotifyEvent(EStreamNotifyEventType eventType, const SSessionKey* pSessionKey,
const SSDataBlock* pInputBlock, const SNodeList* pCondCols, int32_t ri,
SStreamNotifyEventSupp* sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SNode* node = NULL;
cJSON* event = NULL;
cJSON* fields = NULL;
cJSON* cond = NULL;
SStreamNotifyEvent item = {0};
char windowId[32];
QUERY_CHECK_NULL(pSessionKey, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pInputBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pInputBlock->pDataBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pCondCols, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
qDebug("add stream notify event from event window, type: %s, start: %" PRId64 ", end: %" PRId64,
(eventType == SNOTIFY_EVENT_WINDOW_OPEN) ? "WINDOW_OPEN" : "WINDOW_CLOSE", pSessionKey->win.skey,
pSessionKey->win.ekey);
event = cJSON_CreateObject();
QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
// add basic info
streamNotifyGetEventWindowId(pSessionKey, windowId);
if (eventType == SNOTIFY_EVENT_WINDOW_OPEN) {
JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_OPEN"));
} else if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
JSON_CHECK_ADD_ITEM(event, "eventType", cJSON_CreateStringReference("WINDOW_CLOSE"));
}
JSON_CHECK_ADD_ITEM(event, "eventTime", cJSON_CreateNumber(taosGetTimestampMs()));
JSON_CHECK_ADD_ITEM(event, "windowId", cJSON_CreateStringReference(windowId));
JSON_CHECK_ADD_ITEM(event, "windowType", cJSON_CreateStringReference("Event"));
JSON_CHECK_ADD_ITEM(event, "windowStart", cJSON_CreateNumber(pSessionKey->win.skey));
if (eventType == SNOTIFY_EVENT_WINDOW_CLOSE) {
JSON_CHECK_ADD_ITEM(event, "windowEnd", cJSON_CreateNumber(pSessionKey->win.ekey));
}
// create fields object to store matched column values
fields = cJSON_CreateObject();
QUERY_CHECK_NULL(fields, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
FOREACH(node, pCondCols) {
SColumnNode* pColDef = (SColumnNode*)node;
SColumnInfoData* pColData = taosArrayGet(pInputBlock->pDataBlock, pColDef->slotId);
code = jsonAddColumnField(pColDef->colName, pColData, ri, fields);
QUERY_CHECK_CODE(code, lino, _end);
}
// add trigger condition
cond = cJSON_CreateObject();
QUERY_CHECK_NULL(cond, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
JSON_CHECK_ADD_ITEM(cond, "conditionIndex", cJSON_CreateNumber(0));
JSON_CHECK_ADD_ITEM(cond, "fieldValues", fields);
fields = NULL;
JSON_CHECK_ADD_ITEM(event, "triggerConditions", cond);
cond = NULL;
// convert json object to string value
item.gid = pSessionKey->groupId;
item.skey = pSessionKey->win.skey;
item.isEnd = (eventType == SNOTIFY_EVENT_WINDOW_CLOSE);
item.content = cJSON_PrintUnformatted(event);
QUERY_CHECK_NULL(taosArrayPush(sup->pWindowEvents, &item), code, lino, _end, terrno);
item.content = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (cond != NULL) {
cJSON_Delete(cond);
}
if (fields != NULL) {
cJSON_Delete(fields);
}
if (event != NULL) {
cJSON_Delete(event);
}
return code;
}
int32_t addAggResultNotifyEvent(const SSDataBlock* pResultBlock, const SSchemaWrapper* pSchemaWrapper,
SStreamNotifyEventSupp* sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SNode * node = NULL;
cJSON* event = NULL;
cJSON* result = NULL;
SStreamNotifyEvent item = {0};
SColumnInfoData* pWstartCol = NULL;
QUERY_CHECK_NULL(pResultBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pSchemaWrapper, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(sup, code, lino, _end, TSDB_CODE_INVALID_PARA);
qDebug("add %" PRId64 " stream notify results from window agg", pResultBlock->info.rows);
pWstartCol = taosArrayGet(pResultBlock->pDataBlock, 0);
for (int32_t i = 0; i< pResultBlock->info.rows; ++i) {
event = cJSON_CreateObject();
QUERY_CHECK_NULL(event, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
// convert the result row into json
result = cJSON_CreateObject();
QUERY_CHECK_NULL(result, code, lino, _end, TSDB_CODE_OUT_OF_MEMORY);
for (int32_t j = 0; j < pSchemaWrapper->nCols; ++j) {
SSchema *pCol = pSchemaWrapper->pSchema + j;
SColumnInfoData *pColData = taosArrayGet(pResultBlock->pDataBlock, pCol->colId - 1);
code = jsonAddColumnField(pCol->name, pColData, i, result);
QUERY_CHECK_CODE(code, lino, _end);
}
JSON_CHECK_ADD_ITEM(event, "result", result);
result = NULL;
item.gid = pResultBlock->info.id.groupId;
item.skey = *(uint64_t*)colDataGetNumData(pWstartCol, i);
item.content = cJSON_PrintUnformatted(event);
code = taosHashPut(sup->pResultHashMap, &item.gid, sizeof(item.gid) + sizeof(item.skey), &item, sizeof(item));
TSDB_CHECK_CODE(code, lino, _end);
item.content = NULL;
cJSON_Delete(event);
event = NULL;
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
destroyStreamWindowEvent(&item);
if (result != NULL) {
cJSON_Delete(result);
}
if (event != NULL) {
cJSON_Delete(event);
}
return code;
}
static int32_t streamNotifyGetDestTableName(const SExecTaskInfo* pTaskInfo, uint64_t gid, char** pTableName) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
const SStorageAPI* pAPI = NULL;
void* tbname = NULL;
int32_t winCode = TSDB_CODE_SUCCESS;
char parTbName[TSDB_TABLE_NAME_LEN];
const SStreamTaskInfo* pStreamInfo = NULL;
QUERY_CHECK_NULL(pTaskInfo, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pTableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
*pTableName = NULL;
pAPI = &pTaskInfo->storageAPI;
code = pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, gid, &tbname, false, &winCode);
QUERY_CHECK_CODE(code, lino, _end);
if (winCode != TSDB_CODE_SUCCESS) {
parTbName[0] = '\0';
} else {
tstrncpy(parTbName, tbname, sizeof(parTbName));
}
pAPI->stateStore.streamStateFreeVal(tbname);
pStreamInfo = &pTaskInfo->streamInfo;
code = buildSinkDestTableName(parTbName, pStreamInfo->stbFullName, gid, pStreamInfo->newSubTableRule, pTableName);
QUERY_CHECK_CODE(code, lino, _end);
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t streamNotifyFillTableName(const char* tableName, const SStreamNotifyEvent* pEvent,
const SStreamNotifyEvent* pResult, char** pVal) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
static const char* prefix = "{\"tableName\":\"";
uint64_t prefixLen = 0;
uint64_t nameLen = 0;
uint64_t eventLen = 0;
uint64_t resultLen = 0;
uint64_t valLen = 0;
char* val = NULL;
char* p = NULL;
QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pEvent, code, lino , _end, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(pVal, code, lino , _end, TSDB_CODE_INVALID_PARA);
*pVal = NULL;
prefixLen = strlen(prefix);
nameLen = strlen(tableName);
eventLen = strlen(pEvent->content);
if (pResult != NULL) {
resultLen = strlen(pResult->content);
valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + resultLen;
} else {
valLen = VARSTR_HEADER_SIZE + prefixLen + nameLen + eventLen + 1;
}
val = taosMemoryMalloc(valLen);
QUERY_CHECK_NULL(val, code, lino, _end, terrno);
varDataSetLen(val, valLen - VARSTR_HEADER_SIZE);
p = varDataVal(val);
TAOS_STRNCPY(p, prefix, prefixLen);
p += prefixLen;
TAOS_STRNCPY(p, tableName, nameLen);
p += nameLen;
*(p++) = '\"';
TAOS_STRNCPY(p, pEvent->content, eventLen);
*p = ',';
if (pResult != NULL) {
p += eventLen - 1;
TAOS_STRNCPY(p, pResult->content, resultLen);
*p = ',';
}
*pVal = val;
val = NULL;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (val != NULL) {
taosMemoryFreeClear(val);
}
return code;
}
int32_t buildNotifyEventBlock(const SExecTaskInfo* pTaskInfo, SStreamNotifyEventSupp* sup) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SColumnInfoData* pEventStrCol = NULL;
int32_t nWindowEvents = 0;
int32_t nWindowResults = 0;
char* val = NULL;
if (pTaskInfo == NULL || sup == NULL) {
goto _end;
}
QUERY_CHECK_NULL(sup->pEventBlock, code, lino, _end, TSDB_CODE_INVALID_PARA);
blockDataCleanup(sup->pEventBlock);
nWindowEvents = taosArrayGetSize(sup->pWindowEvents);
nWindowResults = taosHashGetSize(sup->pResultHashMap);
qDebug("start to build stream notify event block, nWindowEvents: %d, nWindowResults: %d", nWindowEvents,
nWindowResults);
if (nWindowEvents == 0) {
goto _end;
}
code = blockDataEnsureCapacity(sup->pEventBlock, nWindowEvents);
QUERY_CHECK_CODE(code, lino, _end);
pEventStrCol = taosArrayGet(sup->pEventBlock->pDataBlock, NOTIFY_EVENT_STR_COLUMN_INDEX);
QUERY_CHECK_NULL(pEventStrCol, code, lino, _end, terrno);
for (int32_t i = 0; i < nWindowEvents; ++i) {
SStreamNotifyEvent* pResult = NULL;
SStreamNotifyEvent* pEvent = taosArrayGet(sup->pWindowEvents, i);
char* tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
if (tableName == NULL) {
code = streamNotifyGetDestTableName(pTaskInfo, pEvent->gid, &tableName);
QUERY_CHECK_CODE(code, lino, _end);
code = taosHashPut(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid), tableName, strlen(tableName) + 1);
taosMemoryFreeClear(tableName);
QUERY_CHECK_CODE(code, lino, _end);
tableName = taosHashGet(sup->pTableNameHashMap, &pEvent->gid, sizeof(pEvent->gid));
QUERY_CHECK_NULL(tableName, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
}
if (pEvent->isEnd) {
pResult = taosHashGet(sup->pResultHashMap, &pEvent->gid, sizeof(pEvent->gid) + sizeof(pEvent->skey));
QUERY_CHECK_NULL(pResult, code, lino, _end, TSDB_CODE_INTERNAL_ERROR);
}
code = streamNotifyFillTableName(tableName, pEvent, pResult, &val);
QUERY_CHECK_CODE(code, lino, _end);
code = colDataSetVal(pEventStrCol, i, val, false);
QUERY_CHECK_CODE(code, lino, _end);
taosMemoryFreeClear(val);
sup->pEventBlock->info.rows++;
}
if (taosHashGetMemSize(sup->pTableNameHashMap) >= NOTIFY_EVENT_NAME_CACHE_LIMIT_MB * 1024 * 1024) {
taosHashClear(sup->pTableNameHashMap);
}
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
if (val != NULL) {
taosMemoryFreeClear(val);
}
if (sup != NULL) {
taosArrayClearEx(sup->pWindowEvents, destroyStreamWindowEvent);
taosHashClear(sup->pResultHashMap);
}
return code;
}

View File

@ -55,6 +55,7 @@ void destroyStreamIntervalSliceOperatorInfo(void* param) {
pInfo->pOperator = NULL;
}
destroyStreamBasicInfo(&pInfo->basic);
clearGroupResInfo(&pInfo->groupResInfo);
taosArrayDestroyP(pInfo->pUpdated, destroyFlusedPos);
pInfo->pUpdated = NULL;

View File

@ -150,6 +150,7 @@ void destroyStreamTimeSliceOperatorInfo(void* param) {
&pInfo->groupResInfo);
pInfo->pOperator = NULL;
}
destroyStreamBasicInfo(&pInfo->basic);
colDataDestroy(&pInfo->twAggSup.timeWindowData);
destroyStreamAggSupporter(&pInfo->streamAggSup);
resetPrevAndNextWindow(pInfo->pFillSup);

View File

@ -99,6 +99,8 @@ const char* nodesNodeName(ENodeType type) {
return "CountWindow";
case QUERY_NODE_ANOMALY_WINDOW:
return "AnomalyWindow";
case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
return "StreamNotifyOptions";
case QUERY_NODE_SET_OPERATOR:
return "SetOperator";
case QUERY_NODE_SELECT_STMT:
@ -5812,6 +5814,45 @@ static int32_t jsonToStreamOptions(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkStreamNotifyOptionsAddrUrls = "AddrUrls";
static const char* jkStreamNotifyOptionsEventType = "EventType";
static const char* jkStreamNotifyOptionsErrorHandle = "ErrorHandle";
static const char* jkStreamNotifyOptionsNotifyHistory = "NotifyHistory";
static int32_t streamNotifyOptionsToJson(const void* pObj, SJson* pJson) {
const SStreamNotifyOptions* pNotifyOption = (const SStreamNotifyOptions*)pObj;
int32_t code = nodeListToJson(pJson, jkStreamNotifyOptionsAddrUrls, pNotifyOption->pAddrUrls);
if (code == TSDB_CODE_SUCCESS) {
code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsEventType, pNotifyOption->eventTypes);
}
if (code == TSDB_CODE_SUCCESS) {
code = tjsonAddIntegerToObject(pJson, jkStreamNotifyOptionsErrorHandle, pNotifyOption->errorHandle);
}
if (code == TSDB_CODE_SUCCESS) {
code = tjsonAddBoolToObject(pJson, jkStreamNotifyOptionsNotifyHistory, pNotifyOption->notifyHistory);
}
return code;
}
static int32_t jsonToStreamNotifyOptions(const SJson* pJson, void* pObj) {
SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pObj;
int32_t code = jsonToNodeList(pJson, jkStreamNotifyOptionsAddrUrls, &pNotifyOption->pAddrUrls);
int32_t val = 0;
if (code == TSDB_CODE_SUCCESS) {
code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsEventType, &val);
pNotifyOption->eventTypes = val;
}
if (code == TSDB_CODE_SUCCESS) {
code = tjsonGetIntValue(pJson, jkStreamNotifyOptionsErrorHandle, &val);
pNotifyOption->errorHandle = val;
}
if (code == TSDB_CODE_SUCCESS) {
code = tjsonGetBoolValue(pJson, jkStreamNotifyOptionsNotifyHistory, &pNotifyOption->notifyHistory);
}
return code;
}
static const char* jkWhenThenWhen = "When";
static const char* jkWhenThenThen = "Then";
@ -7207,6 +7248,7 @@ static const char* jkCreateStreamStmtOptions = "Options";
static const char* jkCreateStreamStmtQuery = "Query";
static const char* jkCreateStreamStmtTags = "Tags";
static const char* jkCreateStreamStmtSubtable = "Subtable";
static const char* jkCreateStreamStmtNotifyOptions = "NotifyOptions";
static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
const SCreateStreamStmt* pNode = (const SCreateStreamStmt*)pObj;
@ -7233,6 +7275,9 @@ static int32_t createStreamStmtToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateStreamStmtSubtable, nodeToJson, pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkCreateStreamStmtNotifyOptions, nodeToJson, pNode->pNotifyOptions);
}
return code;
}
@ -7262,6 +7307,9 @@ static int32_t jsonToCreateStreamStmt(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateStreamStmtSubtable, &pNode->pSubtable);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeObject(pJson, jkCreateStreamStmtNotifyOptions, (SNode**)&pNode->pNotifyOptions);
}
return code;
}
@ -8029,6 +8077,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
return countWindowNodeToJson(pObj, pJson);
case QUERY_NODE_ANOMALY_WINDOW:
return anomalyWindowNodeToJson(pObj, pJson);
case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
return streamNotifyOptionsToJson(pObj, pJson);
case QUERY_NODE_SET_OPERATOR:
return setOperatorToJson(pObj, pJson);
case QUERY_NODE_SELECT_STMT:
@ -8402,6 +8452,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
return jsonToCountWindowNode(pJson, pObj);
case QUERY_NODE_ANOMALY_WINDOW:
return jsonToAnomalyWindowNode(pJson, pObj);
case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
return jsonToStreamNotifyOptions(pJson, pObj);
case QUERY_NODE_SET_OPERATOR:
return jsonToSetOperator(pJson, pObj);
case QUERY_NODE_SELECT_STMT:

View File

@ -467,6 +467,9 @@ int32_t nodesMakeNode(ENodeType type, SNode** ppNodeOut) {
case QUERY_NODE_WINDOW_OFFSET:
code = makeNode(type, sizeof(SWindowOffsetNode), &pNode);
break;
case QUERY_NODE_STREAM_NOTIFY_OPTIONS:
code = makeNode(type, sizeof(SStreamNotifyOptions), &pNode);
break;
case QUERY_NODE_SET_OPERATOR:
code = makeNode(type, sizeof(SSetOperator), &pNode);
break;
@ -1267,6 +1270,11 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pAround->pTimepoint);
break;
}
case QUERY_NODE_STREAM_NOTIFY_OPTIONS: {
SStreamNotifyOptions* pNotifyOptions = (SStreamNotifyOptions*)pNode;
nodesDestroyList(pNotifyOptions->pAddrUrls);
break;
}
case QUERY_NODE_SET_OPERATOR: {
SSetOperator* pStmt = (SSetOperator*)pNode;
nodesDestroyList(pStmt->pProjectionList);
@ -1479,6 +1487,7 @@ void nodesDestroyNode(SNode* pNode) {
nodesDestroyNode(pStmt->pQuery);
nodesDestroyList(pStmt->pTags);
nodesDestroyNode(pStmt->pSubtable);
nodesDestroyNode((SNode*)pStmt->pNotifyOptions);
tFreeSCMCreateStreamReq(pStmt->pReq);
taosMemoryFreeClear(pStmt->pReq);
break;

View File

@ -296,8 +296,12 @@ SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, con
SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
SNode* createStreamNotifyOptions(SAstCreateContext *pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes);
SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
SToken* pToken);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
SNode* pNotifyOptions);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createPauseStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
SNode* createResumeStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, bool ignoreUntreated, SToken* pStreamName);

View File

@ -785,7 +785,7 @@ full_view_name(A) ::= db_name(B) NK_DOT view_name(C).
/************************************************ create/drop stream **************************************************/
cmd ::= CREATE STREAM not_exists_opt(E) stream_name(A) stream_options(B) INTO
full_table_name(C) col_list_opt(H) tag_def_or_ref_opt(F) subtable_opt(G)
AS query_or_subquery(D). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H); }
AS query_or_subquery(D) notify_opt(I). { pCxt->pRootNode = createCreateStreamStmt(pCxt, E, &A, C, B, F, G, D, H, I); }
cmd ::= DROP STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createDropStreamStmt(pCxt, A, &B); }
cmd ::= PAUSE STREAM exists_opt(A) stream_name(B). { pCxt->pRootNode = createPauseStreamStmt(pCxt, A, &B); }
cmd ::= RESUME STREAM exists_opt(A) ignore_opt(C) stream_name(B). { pCxt->pRootNode = createResumeStreamStmt(pCxt, A, C, &B); }
@ -832,6 +832,26 @@ subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP.
ignore_opt(A) ::= . { A = false; }
ignore_opt(A) ::= IGNORE UNTREATED. { A = true; }
notify_opt(A) ::= . { A = NULL; }
notify_opt(A) ::= notify_def(B). { A = B; }
notify_def(A) ::= NOTIFY NK_LP url_def_list(B) NK_RP ON NK_LP event_def_list(C) NK_RP. { A = createStreamNotifyOptions(pCxt, B, C); }
notify_def(A) ::= notify_def(B) ON_FAILURE DROP(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
notify_def(A) ::= notify_def(B) ON_FAILURE PAUSE(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_ERROR_HANDLE_SET, &C); }
notify_def(A) ::= notify_def(B) NOTIFY_HISTORY NK_INTEGER(C). { A = setStreamNotifyOptions(pCxt, B, SNOTIFY_OPT_NOTIFY_HISTORY_SET, &C); }
%type url_def_list { SNodeList* }
%destructor url_def_list { nodesDestroyList($$); }
url_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
url_def_list(A) ::= url_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
%type event_def_list { SNodeList* }
%destructor event_def_list { nodesDestroyList($$); }
event_def_list(A) ::= NK_STRING(B). { A = createNodeList(pCxt, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &B)); }
event_def_list(A) ::= event_def_list(B) NK_COMMA NK_STRING(C). { A = addNodeToList(pCxt, B, createValueNode(pCxt, TSDB_DATA_TYPE_BINARY, &C)); }
/************************************************ kill connection/query ***********************************************/
cmd ::= KILL CONNECTION NK_INTEGER(A). { pCxt->pRootNode = createKillStmt(pCxt, QUERY_NODE_KILL_CONNECTION_STMT, &A); }
cmd ::= KILL QUERY NK_STRING(A). { pCxt->pRootNode = createKillQueryStmt(pCxt, &A); }

View File

@ -1526,8 +1526,8 @@ SNode* createCaseWhenNode(SAstCreateContext* pCxt, SNode* pCase, SNodeList* pWhe
pCaseWhen->pCase = pCase;
pCaseWhen->pWhenThenList = pWhenThenList;
pCaseWhen->pElse = pElse;
pCaseWhen->tz = pCxt->pQueryCxt->timezone;
pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
pCaseWhen->tz = pCxt->pQueryCxt->timezone;
pCaseWhen->charsetCxt = pCxt->pQueryCxt->charsetCxt;
return (SNode*)pCaseWhen;
_err:
nodesDestroyNode(pCase);
@ -3657,8 +3657,115 @@ SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptions
return pOptions;
}
static bool validateNotifyUrl(const char* url) {
const char* prefix[] = {"http://", "https://", "ws://", "wss://"};
const char* host = NULL;
if (!url || *url == '\0') return false;
for (int32_t i = 0; i < ARRAY_SIZE(prefix); ++i) {
if (strncasecmp(url, prefix[i], strlen(prefix[i])) == 0) {
host = url + strlen(prefix[i]);
break;
}
}
return (host != NULL) && (*host != '\0') && (*host != '/');
}
SNode* createStreamNotifyOptions(SAstCreateContext* pCxt, SNodeList* pAddrUrls, SNodeList* pEventTypes) {
SNode* pNode = NULL;
EStreamNotifyEventType eventTypes = 0;
const char* eWindowOpenStr = "WINDOW_OPEN";
const char* eWindowCloseStr = "WINDOW_CLOSE";
CHECK_PARSER_STATUS(pCxt);
if (LIST_LENGTH(pAddrUrls) == 0) {
pCxt->errCode =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "notification address cannot be empty");
goto _err;
}
FOREACH(pNode, pAddrUrls) {
char *url = ((SValueNode*)pNode)->literal;
if (strlen(url) >= TSDB_STREAM_NOTIFY_URL_LEN) {
pCxt->errCode =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"notification address \"%s\" exceed maximum length %d", url, TSDB_STREAM_NOTIFY_URL_LEN);
goto _err;
}
if (!validateNotifyUrl(url)) {
pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"invalid notification address \"%s\"", url);
goto _err;
}
}
if (LIST_LENGTH(pEventTypes) == 0) {
pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"event types must be specified for notification");
goto _err;
}
FOREACH(pNode, pEventTypes) {
char *eventStr = ((SValueNode *)pNode)->literal;
if (strncasecmp(eventStr, eWindowOpenStr, strlen(eWindowOpenStr) + 1) == 0) {
BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_OPEN);
} else if (strncasecmp(eventStr, eWindowCloseStr, strlen(eWindowCloseStr) + 1) == 0) {
BIT_FLAG_SET_MASK(eventTypes, SNOTIFY_EVENT_WINDOW_CLOSE);
} else {
pCxt->errCode = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"invalid event type '%s' for notification", eventStr);
goto _err;
}
}
SStreamNotifyOptions* pNotifyOptions = NULL;
pCxt->errCode = nodesMakeNode(QUERY_NODE_STREAM_NOTIFY_OPTIONS, (SNode**)&pNotifyOptions);
CHECK_MAKE_NODE(pNotifyOptions);
pNotifyOptions->pAddrUrls = pAddrUrls;
pNotifyOptions->eventTypes = eventTypes;
pNotifyOptions->errorHandle = SNOTIFY_ERROR_HANDLE_PAUSE;
pNotifyOptions->notifyHistory = false;
nodesDestroyList(pEventTypes);
return (SNode*)pNotifyOptions;
_err:
nodesDestroyList(pAddrUrls);
nodesDestroyList(pEventTypes);
return NULL;
}
SNode* setStreamNotifyOptions(SAstCreateContext* pCxt, SNode* pNode, EStreamNotifyOptionSetFlag setFlag,
SToken* pToken) {
CHECK_PARSER_STATUS(pCxt);
SStreamNotifyOptions* pNotifyOption = (SStreamNotifyOptions*)pNode;
if (BIT_FLAG_TEST_MASK(pNotifyOption->setFlag, setFlag)) {
pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR,
"stream notify options each item can only be set once");
goto _err;
}
switch (setFlag) {
case SNOTIFY_OPT_ERROR_HANDLE_SET:
pNotifyOption->errorHandle = (pToken->type == TK_DROP) ? SNOTIFY_ERROR_HANDLE_DROP : SNOTIFY_ERROR_HANDLE_PAUSE;
break;
case SNOTIFY_OPT_NOTIFY_HISTORY_SET:
pNotifyOption->notifyHistory = taosStr2Int8(pToken->z, NULL, 10);
break;
default:
break;
}
BIT_FLAG_SET_MASK(pNotifyOption->setFlag, setFlag);
return pNode;
_err:
nodesDestroyNode(pNode);
return NULL;
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols,
SNode* pNotifyOptions) {
CHECK_PARSER_STATUS(pCxt);
CHECK_NAME(checkStreamName(pCxt, pStreamName));
SCreateStreamStmt* pStmt = NULL;
@ -3674,6 +3781,7 @@ SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken
pStmt->pTags = pTags;
pStmt->pSubtable = pSubtable;
pStmt->pCols = pCols;
pStmt->pNotifyOptions = (SStreamNotifyOptions*)pNotifyOptions;
return (SNode*)pStmt;
_err:
nodesDestroyNode(pRealTable);
@ -3682,6 +3790,7 @@ _err:
nodesDestroyList(pTags);
nodesDestroyNode(pSubtable);
nodesDestroyList(pCols);
nodesDestroyNode(pNotifyOptions);
return NULL;
}

View File

@ -355,6 +355,9 @@ static SKeyword keywordTable[] = {
{"FORCE_WINDOW_CLOSE", TK_FORCE_WINDOW_CLOSE},
{"DISK_INFO", TK_DISK_INFO},
{"AUTO", TK_AUTO},
{"NOTIFY", TK_NOTIFY},
{"ON_FAILURE", TK_ON_FAILURE},
{"NOTIFY_HISTORY", TK_NOTIFY_HISTORY},
};
// clang-format on

View File

@ -12192,6 +12192,45 @@ static int32_t translateStreamOptions(STranslateContext* pCxt, SCreateStreamStmt
return TSDB_CODE_SUCCESS;
}
static int32_t buildStreamNotifyOptions(STranslateContext* pCxt, SStreamNotifyOptions* pNotifyOptions,
SCMCreateStreamReq* pReq) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pNode = NULL;
if (pNotifyOptions == NULL || pNotifyOptions->pAddrUrls->length == 0) {
return code;
}
pReq->pNotifyAddrUrls = taosArrayInit(pNotifyOptions->pAddrUrls->length, POINTER_BYTES);
if (pReq->pNotifyAddrUrls != NULL) {
FOREACH(pNode, pNotifyOptions->pAddrUrls) {
char *url = taosStrndup(((SValueNode*)pNode)->literal, TSDB_STREAM_NOTIFY_URL_LEN);
if (url == NULL) {
code = terrno;
break;
}
if (taosArrayPush(pReq->pNotifyAddrUrls, &url) == NULL) {
code = terrno;
taosMemoryFreeClear(url);
break;
}
}
} else {
code = terrno;
}
if (code == TSDB_CODE_SUCCESS) {
pReq->notifyEventTypes = pNotifyOptions->eventTypes;
pReq->notifyErrorHandle = pNotifyOptions->errorHandle;
pReq->notifyHistory = pNotifyOptions->notifyHistory;
} else {
taosArrayDestroyP(pReq->pNotifyAddrUrls, NULL);
pReq->pNotifyAddrUrls = NULL;
}
return code;
}
static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {
pReq->igExists = pStmt->ignoreExists;
@ -12238,6 +12277,10 @@ static int32_t buildCreateStreamReq(STranslateContext* pCxt, SCreateStreamStmt*
}
}
if (TSDB_CODE_SUCCESS == code) {
code = buildStreamNotifyOptions(pCxt, pStmt->pNotifyOptions, pReq);
}
return code;
}

View File

@ -735,7 +735,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
!alreadyAddGroupId(pDataBlock->info.parTbName, groupId) && groupId != 0) {
if (pTask->ver == SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
code = buildCtbNameAddGroupId(NULL, pDataBlock->info.parTbName, groupId, sizeof(pDataBlock->info.parTbName));
} else if (pTask->ver > SSTREAM_TASK_SUBTABLE_CHANGED_VER) {
} else if (pTask->ver >= SSTREAM_TASK_APPEND_STABLE_NAME_VER) {
code = buildCtbNameAddGroupId(pTask->outputInfo.shuffleDispatcher.stbFullName, pDataBlock->info.parTbName,
groupId, sizeof(pDataBlock->info.parTbName));
}

View File

@ -198,6 +198,7 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) {
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
tDecoderClear(&decoder);
continue;
}
@ -1031,6 +1032,7 @@ int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
SCheckpointInfo info;
tDecoderInit(&decoder, (uint8_t*)pVal, vLen);
if (tDecodeStreamTaskChkInfo(&decoder, &info) < 0) {
tDecoderClear(&decoder);
continue;
}
tDecoderClear(&decoder);

View File

@ -326,6 +326,11 @@ void tFreeStreamTask(void* pParam) {
streamTaskDestroyActiveChkptInfo(pTask->chkInfo.pActiveInfo);
pTask->chkInfo.pActiveInfo = NULL;
taosArrayDestroyP(pTask->notifyInfo.pNotifyAddrUrls, NULL);
taosMemoryFreeClear(pTask->notifyInfo.streamName);
taosMemoryFreeClear(pTask->notifyInfo.stbFullName);
tDeleteSchemaWrapper(pTask->notifyInfo.pSchemaWrapper);
taosMemoryFree(pTask);
stDebug("s-task:0x%x free task completed", taskId);
}
@ -1318,6 +1323,78 @@ void streamTaskFreeRefId(int64_t* pRefId) {
metaRefMgtRemove(pRefId);
}
static int32_t tEncodeStreamNotifyInfo(SEncoder* pEncoder, const SNotifyInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
QUERY_CHECK_NULL(pEncoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
int32_t addrSize = taosArrayGetSize(info->pNotifyAddrUrls);
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, addrSize));
for (int32_t i = 0; i < addrSize; ++i) {
const char* url = taosArrayGetP(info->pNotifyAddrUrls, i);
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, url));
}
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyEventTypes));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, info->notifyErrorHandle));
if (addrSize > 0) {
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->streamName));
TAOS_CHECK_EXIT(tEncodeCStr(pEncoder, info->stbFullName));
TAOS_CHECK_EXIT(tEncodeSSchemaWrapper(pEncoder, info->pSchemaWrapper));
}
_exit:
if (code != TSDB_CODE_SUCCESS) {
stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
static int32_t tDecodeStreamNotifyInfo(SDecoder* pDecoder, SNotifyInfo* info) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
QUERY_CHECK_NULL(pDecoder, code, lino, _exit, TSDB_CODE_INVALID_PARA);
QUERY_CHECK_NULL(info, code, lino, _exit, TSDB_CODE_INVALID_PARA);
int32_t addrSize = 0;
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &addrSize));
info->pNotifyAddrUrls = taosArrayInit(addrSize, POINTER_BYTES);
QUERY_CHECK_NULL(info->pNotifyAddrUrls, code, lino, _exit, terrno);
for (int32_t i = 0; i < addrSize; ++i) {
char *url = NULL;
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &url));
url = taosStrndup(url, TSDB_STREAM_NOTIFY_URL_LEN);
QUERY_CHECK_NULL(url, code, lino, _exit, terrno);
if (taosArrayPush(info->pNotifyAddrUrls, &url) == NULL) {
taosMemoryFree(url);
TAOS_CHECK_EXIT(terrno);
}
}
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyEventTypes));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &info->notifyErrorHandle));
if (addrSize > 0) {
char* name = NULL;
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
info->streamName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
QUERY_CHECK_NULL(info->streamName, code, lino, _exit, terrno);
TAOS_CHECK_EXIT(tDecodeCStr(pDecoder, &name));
info->stbFullName = taosStrndup(name, TSDB_STREAM_FNAME_LEN + 1);
QUERY_CHECK_NULL(info->stbFullName, code, lino, _exit, terrno);
info->pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (info->pSchemaWrapper == NULL) {
TAOS_CHECK_EXIT(terrno);
}
TAOS_CHECK_EXIT(tDecodeSSchemaWrapper(pDecoder, info->pSchemaWrapper));
}
_exit:
if (code != TSDB_CODE_SUCCESS) {
stError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t code = 0;
@ -1388,6 +1465,10 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->subtableWithoutMd5));
TAOS_CHECK_EXIT(tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1));
if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
TAOS_CHECK_EXIT(tEncodeStreamNotifyInfo(pEncoder, &pTask->notifyInfo));
}
tEndEncode(pEncoder);
_exit:
return code;
@ -1486,8 +1567,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
}
TAOS_CHECK_EXIT(tDecodeCStrTo(pDecoder, pTask->reserve));
if (pTask->ver >= SSTREAM_TASK_ADD_NOTIFY_VER) {
TAOS_CHECK_EXIT(tDecodeStreamNotifyInfo(pDecoder, &pTask->notifyInfo));
}
tEndDecode(pDecoder);
_exit:
return code;
}
}

View File

@ -1490,3 +1490,32 @@ bool taosAssertRelease(bool condition) {
return true;
}
#endif
char* u64toaFastLut(uint64_t val, char* buf) {
static const char* lut =
"0001020304050607080910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455"
"5657585960616263646566676869707172737475767778798081828384858687888990919293949596979899";
char temp[24];
char* p = temp;
while (val >= 100) {
strncpy(p, lut + (val % 100) * 2, 2);
val /= 100;
p += 2;
}
if (val >= 10) {
strncpy(p, lut + val * 2, 2);
p += 2;
} else if (val > 0 || p == temp) {
*(p++) = val + '0';
}
while (p != temp) {
*buf++ = *--p;
}
*buf = '\0';
return buf;
}