other: merge main.

This commit is contained in:
Haojun Liao 2023-11-10 17:05:36 +08:00
commit 438c303b0d
33 changed files with 1811 additions and 252 deletions

View File

@ -316,7 +316,7 @@ def pre_test_build_win() {
python -m pip uninstall taospy -y
python -m pip install taospy==2.7.12
python -m pip uninstall taos-ws-py -y
python -m pip install taos-ws-py==0.2.9
python -m pip install taos-ws-py==0.3.1
xcopy /e/y/i/f %WIN_INTERNAL_ROOT%\\debug\\build\\lib\\taos.dll C:\\Windows\\System32
'''
return 1
@ -424,7 +424,7 @@ pipeline {
echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}"
}
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
timeout(time: 130, unit: 'MINUTES'){
timeout(time: 150, unit: 'MINUTES'){
pre_test()
script {
sh '''

Binary file not shown.

View File

@ -36,6 +36,7 @@ REST connection supports all platforms that can run Java.
| taos-jdbcdriver version | major changes | TDengine version |
| :---------------------: | :------------------------------------------------------------------------------------------------------------------------------------------------: | :--------------: |
| 3.2.7 | Support VARBINARY and GEOMETRY types, and add time zone support for native connections. Support websocket auto reconnection | 3.2.0.0 or later |
| 3.2.5 | Subscription add committed() and assignment() method | 3.1.0.3 or later |
| 3.2.4 | Subscription add the enable.auto.commit parameter and the unsubscribe() method in the WebSocket connection | - |
| 3.2.3 | Fixed resultSet data parsing failure in some cases | - |
@ -178,7 +179,7 @@ Add following dependency in the `pom.xml` file of your Maven project:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.2</version>
<version>3.2.7</version>
</dependency>
```

View File

@ -36,6 +36,7 @@ REST 连接支持所有能运行 Java 的平台。
| taos-jdbcdriver 版本 | 主要变化 | TDengine 版本 |
| :------------------: | :----------------------------------------------------------------------------------------------------------------------------------------------------: | :----------------: |
| 3.2.7 | 支持VARBINARY和GEOMETRY类型增加native连接的时区设置支持。增加websocket自动重连功能。 | 3.2.0.0 及更高版本 |
| 3.2.5 | 数据订阅增加 committed()、assignment() 方法 | 3.1.0.3 及更高版本 |
| 3.2.4 | 数据订阅在 WebSocket 连接下增加 enable.auto.commit 参数,以及 unsubscribe() 方法。 | - |
| 3.2.3 | 修复 ResultSet 在一些情况数据解析失败 | - |
@ -177,7 +178,7 @@ Maven 项目中,在 pom.xml 中添加以下依赖:
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.2</version>
<version>3.2.7</version>
</dependency>
```

View File

@ -105,7 +105,7 @@ spec:
# TZ for timezone settings, we recommend to always set it.
- name: TZ
value: "Asia/Shanghai"
# Environment varibles with prefix TAOS_ will be parsed and converted into corresponding parameter in taos.cfg. For example, serverPort in taos.cfg should be configured by TAOS_SERVER_PORT when using K8S to deploy
# Environment variables with prefix TAOS_ will be parsed and converted into corresponding parameter in taos.cfg. For example, serverPort in taos.cfg should be configured by TAOS_SERVER_PORT when using K8S to deploy
- name: TAOS_SERVER_PORT
value: "6030"
# Must set if you want a cluster.

View File

@ -53,7 +53,7 @@ database_option: {
- 1表示一阶段压缩。
- 2表示两阶段压缩。
- DURATION数据文件存储数据的时间跨度。可以使用加单位的表示形式如 DURATION 100h、DURATION 10d 等,支持 m分钟、h小时和 d三个单位。不加时间单位时默认单位为天如 DURATION 50 表示 50 天。
- WAL_FSYNC_PERIOD当 WAL 参数设置为 2 时,落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。
- WAL_FSYNC_PERIOD当 WAL_LEVEL 参数设置为 2 时,用于设置落盘的周期。默认为 3000单位毫秒。最小为 0表示每次写入立即落盘最大为 180000即三分钟。
- MAXROWS文件块中记录的最大条数默认为 4096 条。
- MINROWS文件块中记录的最小条数默认为 100 条。
- KEEP表示数据文件保存的天数缺省值为 3650取值范围 [1, 365000]且必须大于或等于3倍的 DURATION 参数值。数据库会自动删除保存时间超过 KEEP 值的数据。KEEP 可以使用加单位的表示形式,如 KEEP 100h、KEEP 10d 等,支持 m分钟、h小时和 d三个单位。也可以不写单位如 KEEP 50此时默认单位为天。企业版支持[多级存储](https://docs.taosdata.com/tdinternal/arch/#%E5%A4%9A%E7%BA%A7%E5%AD%98%E5%82%A8)功能, 因此, 可以设置多个保存时间(多个以英文逗号分隔,最多 3 个,满足 keep 0 <= keep 1 <= keep 2如 KEEP 100h,100d,3650d; 社区版不支持多级存储功能(即使配置了多个保存时间, 也不会生效, KEEP 会取最大的保存时间)。

View File

@ -67,7 +67,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.0.0</version>
<version>3.2.7</version>
<!-- <scope>system</scope>-->
<!-- <systemPath>${project.basedir}/src/main/resources/lib/taos-jdbcdriver-2.0.15-dist.jar</systemPath>-->
</dependency>

View File

@ -34,6 +34,7 @@ benchmarkName="taosBenchmark"
dumpName="taosdump"
demoName="taosdemo"
xname="taosx"
keeperName="taoskeeper"
clientName2="taos"
serverName2="${clientName2}d"
@ -42,6 +43,7 @@ productName2="TDengine"
emailName2="taosdata.com"
xname2="${clientName2}x"
adapterName2="${clientName2}adapter"
keeperName2="${clientName2}keeper"
explorerName="${clientName2}-explorer"
benchmarkName2="${clientName2}Benchmark"
@ -214,6 +216,7 @@ function install_bin() {
${csudo}rm -f ${bin_link_dir}/${demoName2} || :
${csudo}rm -f ${bin_link_dir}/${benchmarkName2} || :
${csudo}rm -f ${bin_link_dir}/${dumpName2} || :
${csudo}rm -f ${bin_link_dir}/${keeperName2} || :
${csudo}rm -f ${bin_link_dir}/set_core || :
${csudo}rm -f ${bin_link_dir}/TDinsight.sh || :
@ -227,6 +230,7 @@ function install_bin() {
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${demoName2} || :
[ -x ${install_main_dir}/bin/${benchmarkName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${benchmarkName2} ${bin_link_dir}/${benchmarkName2} || :
[ -x ${install_main_dir}/bin/${dumpName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${dumpName2} ${bin_link_dir}/${dumpName2} || :
[ -x ${install_main_dir}/bin/${keeperName2} ] && ${csudo}ln -sf ${install_main_dir}/bin/${keeperName2} ${bin_link_dir}/${keeperName2} || :
[ -x ${install_main_dir}/bin/TDinsight.sh ] && ${csudo}ln -sf ${install_main_dir}/bin/TDinsight.sh ${bin_link_dir}/TDinsight.sh || :
if [ "$clientName2" == "${clientName}" ]; then
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
@ -518,6 +522,23 @@ function install_adapter_config() {
}
function install_keeper_config() {
if [ -f ${script_dir}/cfg/${keeperName2}.toml ]; then
${csudo}sed -i -r "s/127.0.0.1/${serverFqdn}/g" ${script_dir}/cfg/${keeperName2}.toml
fi
if [ -f "${configDir}/keeper.toml" ]; then
echo "The file keeper.toml will be renamed to ${keeperName2}.toml"
${csudo}cp ${script_dir}/cfg/${keeperName2}.toml ${configDir}/${keeperName2}.toml.new
${csudo}mv ${configDir}/keeper.toml ${configDir}/${keeperName2}.toml
elif [ -f "${configDir}/${keeperName2}.toml" ]; then
# "taoskeeper.toml exists,new config is taoskeeper.toml.new"
${csudo}cp ${script_dir}/cfg/${keeperName2}.toml ${configDir}/${keeperName2}.toml.new
else
${csudo}cp ${script_dir}/cfg/${keeperName2}.toml ${configDir}/${keeperName2}.toml
fi
command -v systemctl >/dev/null 2>&1 && ${csudo}systemctl daemon-reload >/dev/null 2>&1 || true
}
function install_config() {
if [ ! -f "${cfg_install_dir}/${configFile2}" ]; then
@ -914,6 +935,7 @@ function updateProduct() {
install_adapter_service
install_adapter_config
install_keeper_service
install_keeper_config
openresty_work=false
@ -1014,6 +1036,7 @@ function installProduct() {
install_adapter_service
install_adapter_config
install_keeper_service
install_keeper_config
openresty_work=false

View File

@ -92,14 +92,10 @@ else
${build_dir}/bin/tdengine-datasource.zip.md5"
fi
[ -f ${build_dir}/bin/taosx ] && taosx_bin="${build_dir}/bin/taosx"
explorer_bin_files=$(find ${build_dir}/bin/ -name '*-explorer')
bin_files="${build_dir}/bin/${serverName} \
${build_dir}/bin/${clientName} \
${taostools_bin_files} \
${taosx_bin} \
${explorer_bin_files} \
${build_dir}/bin/${clientName}adapter \
${build_dir}/bin/udfd \
${script_dir}/remove.sh \
@ -375,9 +371,6 @@ if [ "$verMode" == "cluster" ]; then
cp ${top_dir}/../enterprise/packaging/install_taosx.sh ${install_dir}/taosx
cp ${top_dir}/../enterprise/src/plugins/taosx/packaging/uninstall.sh ${install_dir}/taosx
sed -i 's/target=\"\"/target=\"taosx\"/g' ${install_dir}/taosx/uninstall.sh
else
echo "taox package not found"
exit 1
fi
fi
fi

View File

@ -277,60 +277,11 @@ function remove_data_and_config() {
if [ X"$log_dir" == X"" ]; then
log_dir="/var/log/taos"
fi
${csudo}rm -rf ${config_dir}/*
${csudo}rm -rf ${data_dir}/*
${csudo}rm -rf ${log_dir}/*
[ -d "${config_dir}" ] && ${csudo}rm -rf ${config_dir}/*
[ -d "${data_dir}" ] && ${csudo}rm -rf ${data_dir}/*
[ -d "${log_dir}" ] && ${csudo}rm -rf ${log_dir}/*
}
function uninstall_taosx() {
if [ -f ${installDir}/uninstall.sh ]; then
cd ${installDir}
bash uninstall.sh
fi
}
if [ "$verMode" == "cluster" ]; then
uninstall_taosx
fi
# Stop service and disable booting start.
clean_service
# Remove binary file and links
clean_bin
# Remove links of local bin
clean_local_bin
# Remove header file.
clean_header
# Remove lib file
clean_lib
# Remove link log directory
clean_log
# Remove link configuration file
clean_config
# Remove data link directory
${csudo}rm -rf ${data_link_dir} || :
${csudo}rm -rf ${install_main_dir}
if [[ -e /etc/os-release ]]; then
osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
else
osinfo=""
fi
if echo $osinfo | grep -qwi "ubuntu"; then
# echo "this is ubuntu system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "debian"; then
# echo "this is debian system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "centos"; then
# echo "this is centos system"
${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || :
fi
if [ "$osType" = "Darwin" ]; then
${csudo}rm -rf /Applications/TDengine.app
fi
_kill_service_of() {
_service=$1
pid=$(ps -ef | grep "$_service" | grep -v "grep" | awk '{print $2}')
@ -391,10 +342,60 @@ remove_taoskeeper() {
# remove taoskeeper bin
_clean_service_of taoskeeper
[ -e "${bin_link_dir}/taoskeeper" ] && ${csudo}rm -rf ${bin_link_dir}/taoskeeper
[ -e "${installDir}/taoskeeper" ] && ${csudo}rm -rf ${installDir}/taoskeeper
[ -e "${cfg_link_dir}/metrics.toml" ] || ${csudo}rm -rf ${cfg_link_dir}/metrics.toml
echo "taosKeeper is removed successfully!"
}
function uninstall_taosx() {
if [ -f ${installDir}/uninstall.sh ]; then
cd ${installDir}
bash uninstall.sh
fi
}
if [ "$verMode" == "cluster" ]; then
uninstall_taosx
fi
remove_taoskeeper
# Stop service and disable booting start.
clean_service
# Remove binary file and links
clean_bin
# Remove links of local bin
clean_local_bin
# Remove header file.
clean_header
# Remove lib file
clean_lib
# Remove link log directory
clean_log
# Remove link configuration file
clean_config
# Remove data link directory
${csudo}rm -rf ${data_link_dir} || :
${csudo}rm -rf ${install_main_dir}
if [[ -e /etc/os-release ]]; then
osinfo=$(awk -F= '/^NAME/{print $2}' /etc/os-release)
else
osinfo=""
fi
if echo $osinfo | grep -qwi "ubuntu"; then
# echo "this is ubuntu system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "debian"; then
# echo "this is debian system"
${csudo}dpkg --force-all -P tdengine >/dev/null 2>&1 || :
elif echo $osinfo | grep -qwi "centos"; then
# echo "this is centos system"
${csudo}rpm -e --noscripts tdengine >/dev/null 2>&1 || :
fi
if [ "$osType" = "Darwin" ]; then
${csudo}rm -rf /Applications/TDengine.app
fi
echo
echo "Do you want to remove all the data, log and configuration files? [y/n]"

View File

@ -1442,7 +1442,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
bool set = false;
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
if (topicNumGet <= 0 && epoch <= tmq->epoch) {
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet);
return false;

View File

@ -472,8 +472,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
if (tsNumOfTaskQueueThreads >= 10) {
tsNumOfTaskQueueThreads = 10;
if (tsNumOfTaskQueueThreads >= 50) {
tsNumOfTaskQueueThreads = 50;
}
if (cfgAddInt32(pCfg, "numOfTaskQueueThreads", tsNumOfTaskQueueThreads, 4, 1024, CFG_SCOPE_CLIENT) != 0) return -1;

View File

@ -1025,7 +1025,7 @@ static int32_t mndProcessCreateDnodeReq(SRpcMsg *pReq) {
char obj[200] = {0};
sprintf(obj, "%s:%d", createReq.fqdn, createReq.port);
auditRecord(pReq, pMnode->clusterId, "createDnode", obj, "", createReq.sql, createReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "createDnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1174,7 +1174,7 @@ static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) {
char obj1[30] = {0};
sprintf(obj1, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropDnode", obj1, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropDnode", "", obj1, dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1375,7 +1375,7 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
char obj[50] = {0};
sprintf(obj, "%d", cfgReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "alterDnode", obj, "", cfgReq.sql, cfgReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "alterDnode", "", obj, cfgReq.sql, cfgReq.sqlLen);
tFreeSMCfgDnodeReq(&cfgReq);

View File

@ -656,7 +656,7 @@ static int32_t mndProcessCreateMnodeReq(SRpcMsg *pReq) {
char obj[40] = {0};
sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createMnode", obj, "", createReq.sql, createReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "createMnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -798,7 +798,7 @@ static int32_t mndProcessDropMnodeReq(SRpcMsg *pReq) {
char obj[40] = {0};
sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropMnode", obj, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropMnode", "", obj, dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -310,13 +310,10 @@ _CONNECT:
code = 0;
char obj[100] = {0};
sprintf(obj, "%s:%d", ip, pConn->port);
char detail[1000] = {0};
sprintf(detail, "app:%s", connReq.app);
sprintf(detail, "%s:%d, app:%s", ip, pConn->port, connReq.app);
auditRecord(pReq, pMnode->clusterId, "login", connReq.user, obj, detail, strlen(detail));
auditRecord(pReq, pMnode->clusterId, "login", "", "", detail, strlen(detail));
_OVER:

View File

@ -310,7 +310,7 @@ static int32_t mndProcessCreateQnodeReq(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", createReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "createQnode", obj, "", createReq.sql, createReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "createQnode", "", obj, createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("qnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
@ -424,7 +424,7 @@ static int32_t mndProcessDropQnodeReq(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", dropReq.dnodeId);
auditRecord(pReq, pMnode->clusterId, "dropQnode", obj, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropQnode", "", obj, dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -728,7 +728,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
pVgEp->vgId = pVgroup->vgId;
taosArrayPush(pSub->unassignedVgs, &pVgEp);
mDebug("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
mInfo("init subscription %s for topic:%s assign vgId:%d", pSub->key, pTopic->name, pVgEp->vgId);
sdbRelease(pSdb, pVgroup);
}

View File

@ -37,23 +37,23 @@
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second
} SNodeEntry;
typedef struct SStreamExecInfo {
SArray *pNodeList;
int64_t ts; // snapshot ts
int64_t activeCheckpoint; // active check point id
SHashObj *pTaskMap;
SArray *pTaskList;
SHashObj * pTaskMap;
SArray * pTaskList;
TdThreadMutex lock;
} SStreamExecInfo;
typedef struct SVgroupChangeInfo {
SHashObj *pDBMap;
SArray *pUpdateNodeList; // SArray<SNodeUpdateInfo>
SArray * pUpdateNodeList; // SArray<SNodeUpdateInfo>
} SVgroupChangeInfo;
static int32_t mndNodeCheckSentinel = 0;
@ -78,7 +78,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static SArray *extractNodeListFromStream(SMnode *pMnode);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady);
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -91,7 +91,7 @@ static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExe
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
static int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList);
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -193,9 +193,9 @@ STREAM_ENCODE_OVER:
SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRow *pRow = NULL;
SSdbRow * pRow = NULL;
SStreamObj *pStream = NULL;
void *buf = NULL;
void * buf = NULL;
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) {
@ -272,7 +272,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream
}
SStreamObj *mndAcquireStream(SMnode *pMnode, char *streamName) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = sdbAcquire(pSdb, SDB_STREAM, streamName);
if (pStream == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
@ -288,7 +288,7 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream) {
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
int8_t status = atomic_load_8(&pStream->status);
if (status == STREAM_STATUS__NORMAL) {
strcpy(dst, "normal");
strcpy(dst, "ready");
} else if (status == STREAM_STATUS__STOP) {
strcpy(dst, "stop");
} else if (status == STREAM_STATUS__FAILED) {
@ -296,7 +296,7 @@ static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
} else if (status == STREAM_STATUS__RECOVER) {
strcpy(dst, "recover");
} else if (status == STREAM_STATUS__PAUSE) {
strcpy(dst, "pause");
strcpy(dst, "paused");
}
}
@ -325,7 +325,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
return TSDB_CODE_SUCCESS;
}
SNode *pAst = NULL;
SNode * pAst = NULL;
int32_t code = nodesStringToNode(ast, &pAst);
SQueryPlan *pPlan = NULL;
@ -350,7 +350,7 @@ static int32_t mndStreamGetPlanString(const char *ast, int8_t triggerType, int64
}
static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, SCMCreateStreamReq *pCreate) {
SNode *pAst = NULL;
SNode * pAst = NULL;
SQueryPlan *pPlan = NULL;
mInfo("stream:%s to create", pCreate->name);
@ -589,7 +589,7 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr
static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) {
SStbObj *pStb = NULL;
SDbObj *pDb = NULL;
SDbObj * pDb = NULL;
SMCreateStbReq createReq = {0};
tstrncpy(createReq.name, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN);
@ -715,10 +715,10 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
}
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
int32_t code = -1;
SStreamObj *pStream = NULL;
SDbObj *pDb = NULL;
SStreamObj * pStream = NULL;
SDbObj * pDb = NULL;
SCMCreateStreamReq createStreamReq = {0};
SStreamObj streamObj = {0};
@ -761,7 +761,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -857,13 +857,13 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
code = TSDB_CODE_ACTION_IN_PROGRESS;
SName name = {0};
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
//TODO
tNameFromString(&name, createStreamReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
// reuse this function for stream
// TODO
if (createStreamReq.sql != NULL) {
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, "",
createStreamReq.sql, strlen(createStreamReq.sql));
auditRecord(pReq, pMnode->clusterId, "createStream", name.dbname, name.tname, createStreamReq.sql,
strlen(createStreamReq.sql));
}
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -877,15 +877,30 @@ _OVER:
return code;
}
int64_t mndStreamGenChkpId(SMnode *pMnode) {
SStreamObj *pStream = NULL;
void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb;
int64_t maxChkpId = 0;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) break;
maxChkpId = TMAX(maxChkpId, pStream->checkpointId);
sdbRelease(pSdb, pStream);
}
return maxChkpId + 1;
}
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
pMsg->checkpointId = taosGetTimestampMs();
pMsg->checkpointId = mndStreamGenChkpId(pMnode);
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size};
@ -919,7 +934,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamCheckpointSourceReq(&encoder, &req);
@ -1042,7 +1057,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
int32_t totLevel = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < totLevel; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
SArray * pLevel = taosArrayGetP(pStream->tasks, i);
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
@ -1059,7 +1074,7 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
return -1;
}
void *buf;
void * buf;
int32_t tlen;
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
pTask->id.taskId) < 0) {
@ -1070,7 +1085,8 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
STransAction action = {0};
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset, TSDB_CODE_SYN_PROPOSE_NOT_READY);
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
TSDB_CODE_SYN_PROPOSE_NOT_READY);
mndReleaseVgroup(pMnode, pVgObj);
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
@ -1110,9 +1126,9 @@ static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream
}
static const char *mndGetStreamDB(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -1135,9 +1151,9 @@ static int32_t initStreamNodeList(SMnode* pMnode) {
}
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStreamObj *pStream = NULL;
int32_t code = 0;
@ -1162,11 +1178,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}
bool allReady = true;
bool allReady = true;
SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady);
if (!allReady) {
mWarn("not all vnodes are ready, ignore the checkpoint")
taosArrayDestroy(pNodeSnapshot);
mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot);
return 0;
}
@ -1187,15 +1202,15 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId *p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
STaskId * p = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (pEntry == NULL) {
continue;
}
if (pEntry->status != TASK_STATUS__READY) {
mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued",
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status));
pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamTaskGetStatusStr(pEntry->status));
ready = false;
break;
}
@ -1255,7 +1270,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMDropStreamReq dropReq = {0};
@ -1331,10 +1346,10 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
removeStreamTasksInBuf(pStream, &execInfo);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
// reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1384,7 +1399,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
}
int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SDbObj *pDb = mndAcquireDb(pMnode, dbName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
@ -1392,7 +1407,7 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
int32_t numOfStreams = 0;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
SStreamObj *pStream = NULL;
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -1411,8 +1426,8 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
}
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1488,8 +1503,8 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
}
static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SMnode * pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SStreamObj *pStream = NULL;
@ -1578,13 +1593,13 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
// status
char status[20 + VARSTR_HEADER_SIZE] = {0};
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pe == NULL) {
continue;
}
const char* pStatus = streamTaskGetStatusStr(pe->status);
const char *pStatus = streamTaskGetStatusStr(pe->status);
STR_TO_VARSTR(status, pStatus);
// status
@ -1596,24 +1611,24 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
colDataSetVal(pColInfo, numOfRows, (const char *)&pe->stage, false);
// input queue
char vbuf[30] = {0};
char buf[25] = {0};
const char* queueInfoStr = "%4.2fMiB (%5.2f%)";
char vbuf[30] = {0};
char buf[25] = {0};
const char *queueInfoStr = "%4.2fMiB (%5.2f%)";
sprintf(buf, queueInfoStr, pe->inputQUsed, pe->inputRate);
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
// output queue
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// sprintf(buf, queueInfoStr, pe->outputQUsed, pe->outputRate);
// STR_TO_VARSTR(vbuf, buf);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
const char* sinkStr = "%.2fMiB";
const char *sinkStr = "%.2fMiB";
sprintf(buf, sinkStr, pe->sinkDataSize);
} else if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
// offset info
@ -1624,7 +1639,7 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
STR_TO_VARSTR(vbuf, buf);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataSetVal(pColInfo, numOfRows, (const char*)vbuf, false);
colDataSetVal(pColInfo, numOfRows, (const char *)vbuf, false);
numOfRows++;
}
@ -1668,7 +1683,7 @@ static int32_t mndPauseStreamTask(STrans *pTrans, SStreamTask *pTask) {
}
int32_t mndPauseAllStreamTasks(STrans *pTrans, SStreamObj *pStream) {
SArray* tasks = pStream->tasks;
SArray *tasks = pStream->tasks;
int32_t size = taosArrayGetSize(tasks);
for (int32_t i = 0; i < size; i++) {
@ -1705,7 +1720,7 @@ static int32_t mndPersistStreamLog(STrans *pTrans, const SStreamObj *pStream, in
}
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMPauseStreamReq pauseReq = {0};
@ -1821,7 +1836,7 @@ int32_t mndResumeAllStreamTasks(STrans *pTrans, SStreamObj *pStream, int8_t igUn
}
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamObj *pStream = NULL;
SMResumeStreamReq pauseReq = {0};
@ -1906,7 +1921,7 @@ static void initNodeUpdateMsg(SStreamTaskNodeUpdateMsg *pMsg, const SVgroupChang
}
static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupChangeInfo *pInfo, int32_t nodeId,
SStreamTaskId* pId, int32_t transId) {
SStreamTaskId *pId, int32_t transId) {
SStreamTaskNodeUpdateMsg req = {0};
initNodeUpdateMsg(&req, pInfo, pId, transId);
@ -1929,7 +1944,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha
return -1;
}
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
void * abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeStreamTaskUpdateMsg(&encoder, &req);
@ -1996,7 +2011,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
for (int32_t k = 0; k < numOfTasks; ++k) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
void *pBuf = NULL;
void * pBuf = NULL;
int32_t len = 0;
streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList);
doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id);
@ -2017,7 +2032,7 @@ static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *p
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
const SEp* p = GET_ACTIVE_EP(pCurrent);
const SEp *p = GET_ACTIVE_EP(pCurrent);
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
return false;
@ -2071,9 +2086,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
return info;
}
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool* allReady) {
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady) {
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SVgObj *pVgroup = NULL;
*allReady = true;
@ -2121,8 +2136,8 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
STrans *pTrans = NULL;
void * pIter = NULL;
STrans * pTrans = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2183,9 +2198,9 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
}
static SArray *extractNodeListFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
while (1) {
@ -2232,9 +2247,9 @@ static SArray *extractNodeListFromStream(SMnode *pMnode) {
}
static void doExtractTasksFromStream(SMnode *pMnode) {
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2269,11 +2284,11 @@ static int32_t doRemoveTasks(SStreamExecInfo *pExecNode, STaskId *pRemovedId) {
return TSDB_CODE_SUCCESS;
}
static bool taskNodeExists(SArray* pList, int32_t nodeId) {
static bool taskNodeExists(SArray *pList, int32_t nodeId) {
size_t num = taosArrayGetSize(pList);
for(int32_t i = 0; i < num; ++i) {
SNodeEntry* pEntry = taosArrayGet(pList, i);
for (int32_t i = 0; i < num; ++i) {
SNodeEntry *pEntry = taosArrayGet(pList, i);
if (pEntry->nodeId == nodeId) {
return true;
}
@ -2283,12 +2298,12 @@ static bool taskNodeExists(SArray* pList, int32_t nodeId) {
}
int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
SArray* pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
SArray *pRemovedTasks = taosArrayInit(4, sizeof(STaskId));
int32_t numOfTask = taosArrayGetSize(execInfo.pTaskList);
for(int32_t i = 0; i < numOfTask; ++i) {
STaskId* pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry* pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
for (int32_t i = 0; i < numOfTask; ++i) {
STaskId * pId = taosArrayGet(execInfo.pTaskList, i);
STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, pId, sizeof(*pId));
bool existed = taskNodeExists(pNodeSnapshot, pEntry->nodeId);
if (!existed) {
@ -2296,21 +2311,21 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
}
}
for(int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId* pId = taosArrayGet(pRemovedTasks, i);
for (int32_t i = 0; i < taosArrayGetSize(pRemovedTasks); ++i) {
STaskId *pId = taosArrayGet(pRemovedTasks, i);
doRemoveTasks(&execInfo, pId);
}
mDebug("remove invalid stream tasks:%d, remain:%d", (int32_t)taosArrayGetSize(pRemovedTasks),
(int32_t) taosArrayGetSize(execInfo.pTaskList));
(int32_t)taosArrayGetSize(execInfo.pTaskList));
int32_t size = taosArrayGetSize(pNodeSnapshot);
SArray* pValidNodeEntryList = taosArrayInit(4, sizeof(SNodeEntry));
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry* p = taosArrayGet(execInfo.pNodeList, i);
for(int32_t j = 0; j < size; ++j) {
SNodeEntry* pEntry = taosArrayGet(pNodeSnapshot, j);
for (int32_t j = 0; j < size; ++j) {
SNodeEntry *pEntry = taosArrayGet(pNodeSnapshot, j);
if (pEntry->nodeId == p->nodeId) {
taosArrayPush(pValidNodeEntryList, p);
break;
@ -2321,7 +2336,7 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
taosArrayDestroy(execInfo.pNodeList);
execInfo.pNodeList = pValidNodeEntryList;
mDebug("remain %d valid node entries", (int32_t) taosArrayGetSize(pValidNodeEntryList));
mDebug("remain %d valid node entries", (int32_t)taosArrayGetSize(pValidNodeEntryList));
taosArrayDestroy(pRemovedTasks);
return 0;
}
@ -2366,7 +2381,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
code = mndProcessVgroupChange(pMnode, &changeInfo);
@ -2401,7 +2415,7 @@ typedef struct SMStreamNodeCheckMsg {
static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
if (sdbGetSize(pSdb, SDB_STREAM) <= 0) {
return 0;
}
@ -2425,7 +2439,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p == NULL) {
STaskStatusEntry entry = {0};
streamTaskStatusInit(&entry, pTask);
@ -2439,7 +2453,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2449,12 +2463,12 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
void * p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
for(int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId* pId = taosArrayGet(pExecNode->pTaskList, k);
for (int32_t k = 0; k < taosArrayGetSize(pExecNode->pTaskList); ++k) {
STaskId *pId = taosArrayGet(pExecNode->pTaskList, k);
if (pId->taskId == id.taskId && pId->streamId == id.streamId) {
taosArrayRemove(pExecNode->pTaskList, k);
mInfo("s-task:0x%x removed from buffer, remain:%d", (int32_t)id.taskId,
@ -2462,7 +2476,6 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecInfo * pExecNode) {
break;
}
}
}
}
}
@ -2492,7 +2505,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name) {
return pTrans;
}
int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
STrans *pTrans = doCreateTrans(pMnode, pStream, "stream-task-reset");
if (pTrans == NULL) {
return terrno;
@ -2509,7 +2522,7 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
SStreamTask *pTask = taosArrayGetP(pLevel, k);
// todo extract method, with pause stream task
SVResetStreamTaskReq* pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
SVResetStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResetStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVResetStreamTaskReq),
@ -2555,9 +2568,9 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
@ -2588,13 +2601,13 @@ int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
int32_t mndResetFromCheckpoint(SMnode *pMnode) {
doKillActiveCheckpointTrans(pMnode);
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb *pSdb = pMnode->pSdb;
SSdb * pSdb = pMnode->pSdb;
SStreamObj *pStream = NULL;
void *pIter = NULL;
void * pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
if (pIter == NULL) {
@ -2613,7 +2626,7 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return 0;
}
int32_t setNodeEpsetExpiredFlag(const SArray* pNodeList) {
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
int32_t num = taosArrayGetSize(pNodeList);
mInfo("set node expired for %d nodes", num);
@ -2641,7 +2654,6 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
for(int32_t j = 0; j < numOfNodes; ++j) {
SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, j);
if (pNodeEntry->nodeId == pTaskEntry->nodeId) {
mInfo("vgId:%d stage updated from %d to %d, nodeUpdate trigger by s-task:0x%" PRIx64, pTaskEntry->nodeId,
pTaskEntry->stage, stage, pTaskEntry->id.taskId);
@ -2653,7 +2665,7 @@ static void updateStageInfo(STaskStatusEntry* pTaskEntry, int32_t stage) {
}
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SMnode * pMnode = pReq->info.node;
SStreamHbMsg req = {0};
bool checkpointFailed = false;
@ -2714,15 +2726,15 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
pTaskEntry->status = p->status;
if (p->status != TASK_STATUS__READY) {
mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
mDebug("received s-task:0x%" PRIx64 " not in ready status:%s", p->id.taskId, streamTaskGetStatusStr(p->status));
}
}
// current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) {
bool allReady = true;
SArray* p = mndTakeVgroupSnapshot(pMnode, &allReady);
bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p);
if (allReady) {

View File

@ -633,10 +633,10 @@ static int32_t mndProcessCreateTopicReq(SRpcMsg *pReq) {
tNameFromString(&dbname, createTopicReq.subDbName, T_NAME_ACCT | T_NAME_DB);
SName topicName = {0};
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&topicName, createTopicReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for topic
auditRecord(pReq, pMnode->clusterId, "createTopic", topicName.dbname, dbname.dbname,
auditRecord(pReq, pMnode->clusterId, "createTopic", dbname.dbname, topicName.dbname,
createTopicReq.sql, strlen(createTopicReq.sql));
_OVER:
@ -839,10 +839,10 @@ end:
}
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
//reuse this function for topic
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropTopic", name.dbname, name.tname, dropReq.sql, dropReq.sqlLen);
tFreeSMDropTopicReq(&dropReq);

View File

@ -1275,7 +1275,7 @@ static int32_t mndProcessCreateUserReq(SRpcMsg *pReq) {
code = mndCreateUser(pMnode, pOperUser->acct, &createReq, pReq);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
auditRecord(pReq, pMnode->clusterId, "createUser", createReq.user, "", createReq.sql, createReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "createUser", "", createReq.user, createReq.sql, createReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
@ -1820,12 +1820,12 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
sprintf(detail, "alterType:%s, enable:%d, superUser:%d, sysInfo:%d, tabName:%s, password:xxx",
mndUserAuditTypeStr(alterReq.alterType), alterReq.enable, alterReq.superUser, alterReq.sysInfo,
alterReq.tabName);
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", detail, strlen(detail));
auditRecord(pReq, pMnode->clusterId, "alterUser", "", alterReq.user, detail, strlen(detail));
}
else if(alterReq.alterType == TSDB_ALTER_USER_SUPERUSER ||
alterReq.alterType == TSDB_ALTER_USER_ENABLE ||
alterReq.alterType == TSDB_ALTER_USER_SYSINFO){
auditRecord(pReq, pMnode->clusterId, "alterUser", alterReq.user, "", alterReq.sql, alterReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "alterUser", "", alterReq.user, alterReq.sql, alterReq.sqlLen);
}
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_READ_DB||
alterReq.alterType == TSDB_ALTER_USER_ADD_WRITE_DB||
@ -1836,29 +1836,29 @@ static int32_t mndProcessAlterUserReq(SRpcMsg *pReq) {
if (strcmp(alterReq.objname, "1.*") != 0){
SName name = {0};
tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, name.dbname,
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", name.dbname, alterReq.user,
alterReq.sql, alterReq.sqlLen);
}else{
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, "*",
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", "", alterReq.user,
alterReq.sql, alterReq.sqlLen);
}
}
else if(alterReq.alterType == TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC){
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.user, alterReq.objname,
auditRecord(pReq, pMnode->clusterId, "GrantPrivileges", alterReq.objname, alterReq.user,
alterReq.sql, alterReq.sqlLen);
}
else if(alterReq.alterType == TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC){
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, alterReq.objname,
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.objname, alterReq.user,
alterReq.sql, alterReq.sqlLen);
}
else{
if (strcmp(alterReq.objname, "1.*") != 0){
SName name = {0};
tNameFromString(&name, alterReq.objname, T_NAME_ACCT | T_NAME_DB);
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, name.dbname,
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", name.dbname, alterReq.user,
alterReq.sql, alterReq.sqlLen);
}else{
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", alterReq.user, "*",
auditRecord(pReq, pMnode->clusterId, "RevokePrivileges", "", alterReq.user,
alterReq.sql, alterReq.sqlLen);
}
}
@ -1933,7 +1933,7 @@ static int32_t mndProcessDropUserReq(SRpcMsg *pReq) {
code = mndDropUser(pMnode, pReq, pUser);
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
auditRecord(pReq, pMnode->clusterId, "dropUser", dropReq.user, "", dropReq.sql, dropReq.sqlLen);
auditRecord(pReq, pMnode->clusterId, "dropUser", "", dropReq.user, dropReq.sql, dropReq.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -2177,7 +2177,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) {
char obj[33] = {0};
sprintf(obj, "%d", req.vgId);
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", obj, "", req.sql, req.sqlLen);
auditRecord(pReq, pMnode->clusterId, "RedistributeVgroup", "", obj, req.sql, req.sqlLen);
_OVER:
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -20,6 +20,7 @@
#include "vnode.h"
#include "vnodeInt.h"
#include "audit.h"
#include "tstrbuild.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
@ -886,6 +887,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
SArray *tbNames = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
@ -902,7 +904,8 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
if (rsp.pArray == NULL || tbUids == NULL) {
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
if (rsp.pArray == NULL || tbUids == NULL || tbNames == NULL) {
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
@ -923,6 +926,17 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto _exit;
}
if(tsEnableAuditCreateTable){
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
if (str == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
rcode = -1;
goto _exit;
}
strcpy(str, pCreateReq->name);
taosArrayPush(tbNames, &str);
}
// validate hash
sprintf(tbName, "%s.%s", pVnode->config.dbname, pCreateReq->name);
if (vnodeValidateTableHash(pVnode, tbName) < 0) {
@ -946,15 +960,6 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
}
taosArrayPush(rsp.pArray, &cRsp);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
auditRecord(NULL, clusterId, "createTable", name.dbname, "", pCreateReq->name, strlen(pCreateReq->name));
}
}
vDebug("vgId:%d, add %d new created tables into query table list", TD_VID(pVnode), (int32_t)taosArrayGetSize(tbUids));
@ -976,17 +981,42 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t i = 0; i < tbNames->size; i++){
char** key = (char**)taosArrayGet(tbNames, i);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(i < tbNames->size - 1){
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "createTable", name.dbname, "", keyJoined, len);
taosStringBuilderDestroy(&sb);
}
_exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFree(pCreateReq->sql);
taosMemoryFree(pCreateReq->comment);
taosArrayDestroy(pCreateReq->ctb.tagName);
taosArrayDestroy(pCreateReq->ctb.tagName);
}
taosArrayDestroyEx(rsp.pArray, tFreeSVCreateTbRsp);
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
taosArrayDestroy(tbNames);
return rcode;
}
@ -1120,6 +1150,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
int32_t ret;
SArray *tbUids = NULL;
STbUidStore *pStore = NULL;
SArray *tbNames = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL;
@ -1138,7 +1169,8 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
if (tbUids == NULL || rsp.pArray == NULL || tbNames == NULL) goto _exit;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
@ -1159,11 +1191,41 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
}
taosArrayPush(rsp.pArray, &dropTbRsp);
if(tsEnableAuditCreateTable){
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pDropTbReq->name);
taosArrayPush(tbNames, &str);
}
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
tdUpdateTbUidList(pVnode->pSma, pStore, false);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t iReq = 0; iReq < req.nReqs; iReq++){
char** key = (char**)taosArrayGet(tbNames, iReq);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(iReq < req.nReqs - 1){
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "dropTable", name.dbname, "", keyJoined, len);
taosStringBuilderDestroy(&sb);
}
_exit:
taosArrayDestroy(tbUids);
tdUidStoreFree(pStore);
@ -1174,6 +1236,7 @@ _exit:
tEncodeSVDropTbBatchRsp(&encoder, &rsp);
tEncoderClear(&encoder);
taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbNames);
return 0;
}

View File

@ -580,6 +580,7 @@ typedef struct SStreamSessionAggOperatorInfo {
bool reCkBlock;
SSDataBlock* pCheckpointRes;
bool clearState;
bool recvGetAll;
} SStreamSessionAggOperatorInfo;
typedef struct SStreamStateAggOperatorInfo {
@ -603,6 +604,7 @@ typedef struct SStreamStateAggOperatorInfo {
SArray* historyWins;
bool reCkBlock;
SSDataBlock* pCheckpointRes;
bool recvGetAll;
} SStreamStateAggOperatorInfo;
typedef struct SStreamPartitionOperatorInfo {

View File

@ -2535,6 +2535,15 @@ void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) {
taosMemoryFree(buf);
}
static void resetUnCloseSessionWinInfo(SSHashObj* winMap) {
void* pIte = NULL;
int32_t iter = 0;
while ((pIte = tSimpleHashIterate(winMap, pIte, &iter)) != NULL) {
SResultWindowInfo* pResInfo = pIte;
pResInfo->pStatePos->beUsed = true;
}
}
static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
@ -2549,6 +2558,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (opRes) {
return opRes;
}
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL;
}
@ -2586,6 +2601,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pAggSup->pResultRows, pInfo->pStUpdated);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -2841,6 +2857,8 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->clearState = false;
pInfo->recvGetAll = false;
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
// for stream
void* buff = NULL;
@ -3457,6 +3475,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return resBlock;
}
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL;
}
@ -3485,6 +3508,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
taosArrayDestroy(pWins);
continue;
} else if (pBlock->info.type == STREAM_GET_ALL) {
pInfo->recvGetAll = true;
getAllSessionWindow(pInfo->streamAggSup.pResultRows, pInfo->pSeUpdated);
continue;
} else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) {
@ -3716,6 +3740,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
}
pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT);
pInfo->recvGetAll = false;
// for stream
void* buff = NULL;

View File

@ -51,6 +51,7 @@ struct SStreamSnapHandle {
int8_t filetype;
SArray* pFileList;
int32_t currFileIdx;
int8_t delFlag;
};
struct SStreamSnapBlockHdr {
int8_t type;
@ -147,6 +148,7 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosMemoryFree(tdir);
return code;
}
pHandle->delFlag = 1;
chkpId = 0;
}
@ -192,8 +194,8 @@ int32_t streamSnapHandleInit(SStreamSnapHandle* pHandle, char* path, int64_t chk
taosArrayPush(pFile->pSst, &sst);
}
}
{
char* buf = taosMemoryCalloc(1, 512);
if (qDebugFlag & DEBUG_TRACE) {
char* buf = taosMemoryCalloc(1, 128 + taosArrayGetSize(pFile->pSst) * 16);
sprintf(buf, "[current: %s,", pFile->pCurrent);
sprintf(buf + strlen(buf), "MANIFEST: %s,", pFile->pMainfest);
sprintf(buf + strlen(buf), "options: %s,", pFile->pOptions);
@ -273,7 +275,7 @@ void streamSnapHandleDestroy(SStreamSnapHandle* handle) {
if (handle->checkpointId == 0) {
// del tmp dir
if (pFile && taosIsDir(pFile->path)) {
taosRemoveDir(pFile->path);
if (handle->delFlag) taosRemoveDir(pFile->path);
}
} else {
streamBackendDelInUseChkp(handle->handle, handle->checkpointId);
@ -343,10 +345,10 @@ int32_t streamSnapRead(SStreamSnapReader* pReader, uint8_t** ppData, int64_t* si
stDebug("%s start to read file %s, current offset:%" PRId64 ", size:%" PRId64 ", file no.%d", STREAM_STATE_TRANSFER,
item->name, (int64_t)pHandle->offset, item->size, pHandle->currFileIdx);
uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize);
if(buf == NULL){
if (buf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
int64_t nread = taosPReadFile(pHandle->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pHandle->offset);
if (nread == -1) {
taosMemoryFree(buf);
code = TAOS_SYSTEM_ERROR(terrno);
@ -479,8 +481,8 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa
}
int32_t streamSnapWriterClose(SStreamSnapWriter* pWriter, int8_t rollback) {
SStreamSnapHandle* handle = &pWriter->handle;
if (qDebugFlag & DEBUG_DEBUG) {
char* buf = (char*)taosMemoryMalloc(1024);
if (qDebugFlag & DEBUG_TRACE) {
char* buf = (char*)taosMemoryMalloc(128 + taosArrayGetSize(handle->pFileList) * 16);
int n = sprintf(buf, "[");
for (int i = 0; i < taosArrayGetSize(handle->pFileList); i++) {
SBackendFileItem* item = taosArrayGet(handle->pFileList, i);

View File

@ -795,7 +795,12 @@ static void *taosAsyncOutputLog(void *param) {
updateCron = 0;
}
if (pLogBuf->stop || pSlowBuf->stop) break;
if (pLogBuf->stop || pSlowBuf->stop) {
pLogBuf->lastDuration = LOG_MAX_WAIT_MSEC;
taosWriteLog(pLogBuf);
taosWriteLog(pSlowBuf);
break;
}
}
return NULL;
@ -822,7 +827,7 @@ bool taosAssertDebug(bool condition, const char *file, int32_t line, const char
taosPrintTrace(flags, level, dflag, -1);
if (tsAssert) {
// taosCloseLog();
taosCloseLog();
taosMsleep(300);
#ifdef NDEBUG

View File

@ -69,7 +69,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j 10|| exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j 10|| exit 1"
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
@ -99,7 +99,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j 10|| exit 1 "
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j 10|| exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -84,7 +84,7 @@ pip3 install --default-timeout=120 taospy==2.7.12
#define taos-ws-py 0.2.8
pip3 list|grep taos-ws-py
pip3 uninstall taos-ws-py -y
pip3 install --default-timeout=120 taos-ws-py==0.2.8
pip3 install --default-timeout=600 taos-ws-py==0.3.1
$TIMEOUT_CMD $cmd
RET=$?

File diff suppressed because it is too large Load Diff

View File

@ -22,6 +22,8 @@ sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sql insert into t1 values(1648791216000,2,2,3,1.1);
sql insert into t1 values(1648791220000,3,2,3,2.1);
@ -312,6 +314,8 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
sql insert into t1 values(1648791233002,3,2,3,2.1);
@ -445,6 +449,8 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791233002,3,3,3,2.1);
@ -584,6 +590,8 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sleep 1000
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
sql insert into t1 values(1648791233002,3,3,3,2.1);
@ -708,6 +716,8 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791243000,2,1,1,1.0);

View File

@ -108,16 +108,17 @@ ELSE ()
)
EXECUTE_PROCESS(
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}/taosadapter
COMMAND git rev-parse --short HEAD
COMMAND git rev-parse HEAD
RESULT_VARIABLE commit_sha1
OUTPUT_VARIABLE taosadapter_commit_sha1
)
IF ("${taosadapter_commit_sha1}" STREQUAL "")
SET(taosadapter_commit_sha1 "unknown")
ELSE ()
STRING(SUBSTRING "${taosadapter_commit_sha1}" 0 7 taosadapter_commit_sha1)
# STRING(SUBSTRING "${taosadapter_commit_sha1}" 0 7 taosadapter_commit_sha1)
STRING(STRIP "${taosadapter_commit_sha1}" taosadapter_commit_sha1)
ENDIF ()
SET(taos_version ${TD_VER_NUMBER})
MESSAGE("${Green} taosAdapter will use ${taos_version} and commit ${taosadapter_commit_sha1} as version ${ColourReset}")
EXECUTE_PROCESS(
COMMAND cd ..
@ -139,9 +140,8 @@ ELSE ()
BUILD_COMMAND
COMMAND set CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client
COMMAND set CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib
# COMMAND go build -a -o taosadapter.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND go build -a -o taosadapter.exe -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
COMMAND go build -a -o taosadapter-debug.exe -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe"
@ -167,9 +167,8 @@ ELSE ()
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
INSTALL_COMMAND
COMMAND cmake -E echo "Copy taosadapter"
COMMAND cmake -E copy taosadapter ${CMAKE_BINARY_DIR}/build/bin
@ -193,9 +192,8 @@ ELSE ()
PATCH_COMMAND
COMMAND git clean -f -d
BUILD_COMMAND
# COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X github.com/taosdata/taosadapter/v3/version.Version=${taos_version} -X github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -ldflags "-s -w -X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
COMMAND CGO_CFLAGS=-I${CMAKE_CURRENT_SOURCE_DIR}/../include/client CGO_LDFLAGS=-L${CMAKE_BINARY_DIR}/build/lib go build -a -o taosadapter-debug -ldflags "-X 'github.com/taosdata/taosadapter/v3/version.Version=${taos_version}' -X 'github.com/taosdata/taosadapter/v3/version.CommitID=${taosadapter_commit_sha1}' -X 'github.com/taosdata/taosadapter/v3/version.BuildInfo=${TD_VER_DATE}'"
INSTALL_COMMAND
COMMAND cmake -E echo "Comparessing taosadapter.exe"
COMMAND upx taosadapter || :

View File

@ -409,11 +409,7 @@ static int32_t shellCheckArgs() {
int32_t shellParseArgs(int32_t argc, char *argv[]) {
shellInitArgs(argc, argv);
shell.info.clientVersion =
#ifdef WEBSOCKET
"Welcome to the %s Command Line Interface (WebSocket), Client Version:%s\r\n"
#else
"Welcome to the %s Command Line Interface, Client Version:%s\r\n"
#endif
"Copyright (c) 2023 by %s, all rights reserved.\r\n\r\n";
#ifdef CUS_NAME
strcpy(shell.info.cusName, CUS_NAME);