Merge branch '3.0' into fix/TD-19223-D
This commit is contained in:
commit
58badb06c7
|
@ -212,6 +212,7 @@ tmq_list_t* build_topic_list() {
|
|||
tmq_list_t* topicList = tmq_list_new();
|
||||
int32_t code = tmq_list_append(topicList, "topicname");
|
||||
if (code) {
|
||||
tmq_list_destroy(topicList);
|
||||
return NULL;
|
||||
}
|
||||
return topicList;
|
||||
|
|
|
@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
|
|||
if (pRebInfo == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
tstrncpy(pRebInfo->key, key, sizeof(pRebInfo->key));
|
||||
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||
if (pRebInfo->lostConsumers == NULL) {
|
||||
goto _err;
|
||||
|
|
|
@ -239,6 +239,7 @@ typedef enum ELogicConditionType {
|
|||
#define TSDB_MAX_TAGS 128
|
||||
#define TSDB_MAX_TAG_CONDITIONS 1024
|
||||
|
||||
#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)
|
||||
#define TSDB_MAX_JSON_TAG_LEN 16384
|
||||
#define TSDB_MAX_JSON_KEY_LEN 256
|
||||
|
||||
|
@ -496,6 +497,8 @@ enum {
|
|||
|
||||
#define MAX_NUM_STR_SIZE 40
|
||||
|
||||
#define MAX_META_MSG_IN_BATCH 1048576
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -168,7 +168,7 @@ function install_bin() {
|
|||
|
||||
${csudo}cp -r ${binary_dir}/build/bin/${clientName} ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/taosBenchmark ] && ${csudo}cp -r ${binary_dir}/build/bin/taosBenchmark ${install_main_dir}/bin || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
|
||||
[ -f ${binary_dir}/build/bin/taosdump ] && ${csudo}cp -r ${binary_dir}/build/bin/taosdump ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || :
|
||||
|
@ -182,21 +182,21 @@ function install_bin() {
|
|||
|
||||
${csudo}chmod 0555 ${install_main_dir}/bin/*
|
||||
#Make link
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
|
||||
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter || :
|
||||
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd || :
|
||||
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
|
||||
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo}ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
|
||||
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx || :
|
||||
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo}ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || :
|
||||
[ -x ${install_main_dir}/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo}ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo}ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} > /dev/null 2>&1 || :
|
||||
else
|
||||
|
||||
${csudo}cp -r ${binary_dir}/build/bin/${clientName} ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/taosBenchmark ] && ${csudo}cp -r ${binary_dir}/build/bin/taosBenchmark ${install_main_dir}/bin || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
|
||||
[ -f ${binary_dir}/build/bin/taosdump ] && ${csudo}cp -r ${binary_dir}/build/bin/taosdump ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || :
|
||||
[ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || :
|
||||
|
@ -206,14 +206,14 @@ function install_bin() {
|
|||
${csudo}cp -r ${script_dir}/remove.sh ${install_main_dir}/bin || :
|
||||
${csudo}chmod 0555 ${install_main_dir}/bin/*
|
||||
#Make link
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
|
||||
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter || :
|
||||
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd || :
|
||||
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
|
||||
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
|
||||
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump > /dev/null 2>&1 || :
|
||||
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx > /dev/null 2>&1 || :
|
||||
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} > /dev/null 2>&1 || :
|
||||
fi
|
||||
}
|
||||
|
||||
|
@ -238,7 +238,7 @@ function install_jemalloc() {
|
|||
if [ -f "${binary_dir}/build/lib/libjemalloc.so.2" ]; then
|
||||
${csudo}/usr/bin/install -c -d /usr/local/lib
|
||||
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
|
||||
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
|
||||
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so > /dev/null 2>&1
|
||||
${csudo}/usr/bin/install -c -d /usr/local/lib
|
||||
[ -f ${binary_dir}/build/lib/libjemalloc.a ] &&
|
||||
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
|
||||
|
@ -274,8 +274,8 @@ function install_avro() {
|
|||
if [ -f "${binary_dir}/build/$1/libavro.so.23.0.0" ] && [ -d /usr/local/$1 ]; then
|
||||
${csudo}/usr/bin/install -c -d /usr/local/$1
|
||||
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/$1/libavro.so.23.0.0 /usr/local/$1
|
||||
${csudo}ln -sf libavro.so.23.0.0 /usr/local/$1/libavro.so.23
|
||||
${csudo}ln -sf libavro.so.23 /usr/local/$1/libavro.so
|
||||
${csudo}ln -sf libavro.so.23.0.0 /usr/local/$1/libavro.so.23 > /dev/null 2>&1
|
||||
${csudo}ln -sf libavro.so.23 /usr/local/$1/libavro.so > /dev/null 2>&1
|
||||
${csudo}/usr/bin/install -c -d /usr/local/$1
|
||||
[ -f ${binary_dir}/build/$1/libavro.a ] &&
|
||||
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/$1/libavro.a /usr/local/$1
|
||||
|
@ -304,11 +304,11 @@ function install_lib() {
|
|||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaos.so.${verNumber}
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 > /dev/null 2>&1
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so > /dev/null 2>&1
|
||||
if [ -d "${lib64_link_dir}" ]; then
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 > /dev/null 2>&1
|
||||
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so > /dev/null 2>&1
|
||||
fi
|
||||
|
||||
if [ -f ${binary_dir}/build/lib/libtaosws.so ]; then
|
||||
|
@ -316,23 +316,23 @@ function install_lib() {
|
|||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so ||:
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so > /dev/null 2>&1 || :
|
||||
fi
|
||||
else
|
||||
${csudo}cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib \
|
||||
${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/*
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaos.${verNumber}.dylib \
|
||||
${lib_link_dir}/libtaos.1.dylib || :
|
||||
${lib_link_dir}/libtaos.1.dylib > /dev/null 2>&1 || :
|
||||
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib || :
|
||||
${csudo}ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib > /dev/null 2>&1 || :
|
||||
|
||||
if [ -f ${binary_dir}/build/lib/libtaosws.dylib ]; then
|
||||
${csudo}cp ${binary_dir}/build/lib/libtaosws.dylib \
|
||||
${install_main_dir}/driver &&
|
||||
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.dylib ||:
|
||||
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.dylib ${lib_link_dir}/libtaosws.dylib || :
|
||||
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.dylib ${lib_link_dir}/libtaosws.dylib > /dev/null 2>&1 || :
|
||||
fi
|
||||
fi
|
||||
|
||||
|
@ -346,6 +346,7 @@ function install_lib() {
|
|||
}
|
||||
|
||||
function install_header() {
|
||||
${csudo}mkdir -p ${inc_link_dir}
|
||||
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
|
||||
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h ||:
|
||||
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
|
||||
|
@ -353,13 +354,13 @@ function install_header() {
|
|||
|
||||
if [ -f ${binary_dir}/build/include/taosws.h ]; then
|
||||
${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h ||:
|
||||
${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h ||:
|
||||
${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h > /dev/null 2>&1 ||:
|
||||
fi
|
||||
|
||||
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
|
||||
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h > /dev/null 2>&1
|
||||
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h > /dev/null 2>&1
|
||||
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h > /dev/null 2>&1
|
||||
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h > /dev/null 2>&1
|
||||
|
||||
${csudo}chmod 644 ${install_main_dir}/include/*
|
||||
}
|
||||
|
@ -374,7 +375,7 @@ function install_config() {
|
|||
${csudo}cp -f ${script_dir}/../cfg/${configFile} \
|
||||
${cfg_install_dir}/${configFile}.${verNumber}
|
||||
${csudo}ln -s ${cfg_install_dir}/${configFile} \
|
||||
${install_main_dir}/cfg/${configFile}
|
||||
${install_main_dir}/cfg/${configFile} > /dev/null 2>&1
|
||||
else
|
||||
${csudo}cp -f ${script_dir}/../cfg/${configFile} \
|
||||
${cfg_install_dir}/${configFile}.${verNumber}
|
||||
|
@ -395,7 +396,7 @@ function install_taosadapter_config() {
|
|||
${cfg_install_dir}/taosadapter.toml.${verNumber} || :
|
||||
[ -f ${cfg_install_dir}/taosadapter.toml ] &&
|
||||
${csudo}ln -s ${cfg_install_dir}/taosadapter.toml \
|
||||
${install_main_dir}/cfg/taosadapter.toml || :
|
||||
${install_main_dir}/cfg/taosadapter.toml > /dev/null 2>&1 || :
|
||||
else
|
||||
if [ -f "${binary_dir}/test/cfg/taosadapter.toml" ]; then
|
||||
${csudo}cp -f ${binary_dir}/test/cfg/taosadapter.toml \
|
||||
|
@ -408,12 +409,12 @@ function install_taosadapter_config() {
|
|||
function install_log() {
|
||||
${csudo}rm -rf ${log_dir} || :
|
||||
${csudo}mkdir -p ${log_dir} && ${csudo}chmod 777 ${log_dir}
|
||||
${csudo}ln -s ${log_dir} ${install_main_dir}/log
|
||||
${csudo}ln -s ${log_dir} ${install_main_dir}/log > /dev/null 2>&1
|
||||
}
|
||||
|
||||
function install_data() {
|
||||
${csudo}mkdir -p ${data_dir} && ${csudo}chmod 777 ${data_dir}
|
||||
${csudo}ln -s ${data_dir} ${install_main_dir}/data
|
||||
${csudo}ln -s ${data_dir} ${install_main_dir}/data > /dev/null 2>&1
|
||||
}
|
||||
|
||||
function install_connector() {
|
||||
|
@ -533,7 +534,7 @@ function install_taosadapter_service() {
|
|||
function install_service_on_launchctl() {
|
||||
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
|
||||
${csudo}cp ${script_dir}/com.taosdata.taosd.plist /Library/LaunchDaemons/com.taosdata.taosd.plist
|
||||
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist || :
|
||||
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
|
||||
}
|
||||
|
||||
function install_service() {
|
||||
|
|
|
@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
|
|||
}
|
||||
|
||||
if (userNum <= 0) {
|
||||
taosMemoryFree(users);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
}
|
||||
|
||||
if (dbNum <= 0) {
|
||||
taosMemoryFree(dbs);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
|
|||
}
|
||||
|
||||
if (stbNum <= 0) {
|
||||
taosMemoryFree(stbs);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -233,12 +233,12 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
|
|||
|
||||
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
||||
if (strcmp(key, "group.id") == 0) {
|
||||
strcpy(conf->groupId, value);
|
||||
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
if (strcmp(key, "client.id") == 0) {
|
||||
strcpy(conf->clientId, value);
|
||||
tstrncpy(conf->clientId, value, 256);
|
||||
return TMQ_CONF_OK;
|
||||
}
|
||||
|
||||
|
@ -452,7 +452,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
int32_t code;
|
||||
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||
|
@ -464,15 +463,22 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
|||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeSTqOffset(&encoder, pOffset);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
// build param
|
||||
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
|
||||
if (pParam == NULL) {
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
}
|
||||
pParam->params = pParamSet;
|
||||
pParam->pOffset = pOffset;
|
||||
|
||||
// build send info
|
||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (pMsgSendInfo == NULL) {
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pParam);
|
||||
return -1;
|
||||
}
|
||||
pMsgSendInfo->msgInfo = (SDataBuf){
|
||||
|
@ -547,6 +553,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
|||
|
||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
goto FAIL;
|
||||
}
|
||||
goto HANDLE_RSP;
|
||||
|
@ -565,6 +573,7 @@ HANDLE_RSP:
|
|||
tsem_wait(&pParamSet->rspSem);
|
||||
code = pParamSet->rspErr;
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
return code;
|
||||
} else {
|
||||
code = 0;
|
||||
|
@ -587,7 +596,14 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
|||
|
||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||
if (pParamSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (async) {
|
||||
if (automatic) {
|
||||
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||
} else {
|
||||
userCb(tmq, code, userParam);
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -642,16 +658,6 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
|
|||
code = pParamSet->rspErr;
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
} else {
|
||||
code = 0;
|
||||
}
|
||||
|
||||
if (code != 0 && async) {
|
||||
if (automatic) {
|
||||
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||
} else {
|
||||
userCb(tmq, code, userParam);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
|
@ -722,6 +728,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
|
|||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
goto OVER;
|
||||
}
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = pReq,
|
||||
|
@ -871,7 +878,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
|
||||
if (pTmq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tscError("consumer setup failed since %s", terrstr());
|
||||
tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1060,9 +1067,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
code = 0;
|
||||
FAIL:
|
||||
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
|
||||
if (code != 0 && buf) {
|
||||
taosMemoryFree(buf);
|
||||
}
|
||||
taosMemoryFree(buf);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1100,7 +1106,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
|
||||
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
|
||||
if (pRspWrapper == NULL) {
|
||||
taosMemoryFree(pMsg->pData);
|
||||
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
|
||||
goto CREATE_MSG_FAIL;
|
||||
}
|
||||
|
|
|
@ -602,7 +602,7 @@ _exit:
|
|||
}
|
||||
|
||||
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
|
||||
int32_t n;
|
||||
int32_t n = 0;
|
||||
|
||||
TSROW_LEN(pRow, n);
|
||||
if (p) {
|
||||
|
@ -613,7 +613,7 @@ int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
|
|||
}
|
||||
|
||||
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) {
|
||||
int32_t n;
|
||||
int32_t n = 0;
|
||||
|
||||
*ppRow = (STSRow2 *)p;
|
||||
TSROW_LEN(*ppRow, n);
|
||||
|
|
|
@ -795,7 +795,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
|
|||
// not enough time range
|
||||
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
||||
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
|
||||
while (end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window
|
||||
while (end < t) { // move forward to the correct time window
|
||||
start += pInterval->sliding;
|
||||
|
||||
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
|
||||
|
|
|
@ -366,6 +366,7 @@ static int32_t toBinary(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
|
|||
}
|
||||
|
||||
if (pBuf != NULL) {
|
||||
taosMemoryFree(pVariant->pz);
|
||||
*pDest = pBuf;
|
||||
}
|
||||
|
||||
|
@ -688,7 +689,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
|
|||
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
|
||||
SET_EXT_INFO(converted, (int64_t)result, INT64_MIN + 1, INT64_MAX, extInfo);
|
||||
SET_EXT_INFO(converted, result, INT64_MIN + 1, INT64_MAX, extInfo);
|
||||
return -1;
|
||||
}
|
||||
*((int64_t *)payload) = (int64_t)result;
|
||||
|
@ -697,7 +698,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
|
|||
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
|
||||
SET_EXT_INFO(converted, (uint64_t)result, 0, UINT64_MAX - 1, extInfo);
|
||||
SET_EXT_INFO(converted, result, 0, UINT64_MAX - 1, extInfo);
|
||||
return -1;
|
||||
}
|
||||
*((uint64_t *)payload) = (uint64_t)result;
|
||||
|
|
|
@ -22,6 +22,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define MAX_META_MSG_IN_BATCH 1048576
|
||||
|
||||
int32_t mndInitQuery(SMnode *pMnode);
|
||||
void mndCleanupQuery(SMnode *pMnode);
|
||||
|
||||
|
|
|
@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
|
|||
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
|
||||
}
|
||||
|
||||
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
||||
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
|
||||
SAppObj *pApp = NULL;
|
||||
bool hasNext = taosCacheIterNext(pIter);
|
||||
if (hasNext) {
|
||||
|
|
|
@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
|||
void *pRsp = NULL;
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
|
||||
if (msgNum >= MAX_META_MSG_IN_BATCH) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
mError("too many msgs %d in mnode batch meta req", msgNum);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
|
|||
if (fp == NULL) {
|
||||
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
|
||||
taosArrayDestroy(batchRsp);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -675,7 +675,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
|
|||
SSchemaWrapper *pSchemaWrapper = &schema;
|
||||
|
||||
tDecoderInit(&dc, pData, nData);
|
||||
tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
|
||||
(void)tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
|
||||
tDecoderClear(&dc);
|
||||
tdbFree(pData);
|
||||
|
||||
|
|
|
@ -944,6 +944,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
metaUpdateTagIdx(pMeta, &ctbEntry);
|
||||
}
|
||||
|
||||
ASSERT(ctbEntry.ctbEntry.pTags);
|
||||
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
|
||||
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
|
||||
((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
|
||||
|
@ -952,7 +953,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
|
|||
|
||||
tDecoderClear(&dc1);
|
||||
tDecoderClear(&dc2);
|
||||
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
||||
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
|
||||
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
|
||||
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
|
||||
tdbTbcClose(pTbDbc);
|
||||
|
@ -1202,8 +1203,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
|
|||
SMetaEntry stbEntry = {0};
|
||||
STagIdxKey *pTagIdxKey = NULL;
|
||||
int32_t nTagIdxKey;
|
||||
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
|
||||
const void *pTagData = NULL; //
|
||||
const SSchema *pTagColumn;
|
||||
const void *pTagData = NULL;
|
||||
int32_t nTagData = 0;
|
||||
SDecoder dc = {0};
|
||||
int32_t ret = 0;
|
||||
|
|
|
@ -293,7 +293,6 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;
|
||||
|
||||
int32_t blockSz = taosArrayGetSize(pBlocks);
|
||||
bool createTb = true;
|
||||
|
||||
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
|
||||
if (!tagArray) {
|
||||
|
@ -303,6 +302,7 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
|
||||
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
|
||||
for (int32_t i = 0; i < blockSz; i++) {
|
||||
bool createTb = true;
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
|
||||
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
|
||||
SBatchDeleteReq deleteReq = {0};
|
||||
|
@ -337,83 +337,119 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
tqDebug("failed to put delete req into write-queue since %s", terrstr());
|
||||
}
|
||||
} else {
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
|
||||
// set const
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
// set super table name
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||
|
||||
// set tag content
|
||||
taosArrayClear(tagArray);
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.groupId,
|
||||
};
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (pTag == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(tagArray);
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
return;
|
||||
}
|
||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set tag name
|
||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||
strcpy(tagNameStr, "group_id");
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
createTbReq.ctb.tagName = tagName;
|
||||
|
||||
// set table name
|
||||
char* ctbName = NULL;
|
||||
// set child table name
|
||||
if (pDataBlock->info.parTbName[0]) {
|
||||
createTbReq.name = strdup(pDataBlock->info.parTbName);
|
||||
ctbName = strdup(pDataBlock->info.parTbName);
|
||||
} else {
|
||||
createTbReq.name = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
|
||||
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
|
||||
}
|
||||
|
||||
int32_t schemaLen;
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return;
|
||||
}
|
||||
int32_t schemaLen = 0;
|
||||
void* schemaStr = NULL;
|
||||
|
||||
// save schema str
|
||||
void* schemaStr = taosMemoryMalloc(schemaLen);
|
||||
if (schemaStr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return;
|
||||
}
|
||||
int64_t uid = 0;
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
|
||||
metaReaderClear(&mr);
|
||||
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, schemaStr, schemaLen);
|
||||
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
SVCreateTbReq createTbReq = {0};
|
||||
|
||||
// set const
|
||||
createTbReq.flags = 0;
|
||||
createTbReq.type = TSDB_CHILD_TABLE;
|
||||
createTbReq.ctb.suid = suid;
|
||||
|
||||
// set super table name
|
||||
SName name = {0};
|
||||
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
|
||||
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
|
||||
createTbReq.name = ctbName;
|
||||
ctbName = NULL;
|
||||
|
||||
// set tag content
|
||||
taosArrayClear(tagArray);
|
||||
STagVal tagVal = {
|
||||
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
|
||||
.type = TSDB_DATA_TYPE_UBIGINT,
|
||||
.i64 = (int64_t)pDataBlock->info.groupId,
|
||||
};
|
||||
taosArrayPush(tagArray, &tagVal);
|
||||
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
|
||||
|
||||
STag* pTag = NULL;
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (pTag == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(tagArray);
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
return;
|
||||
}
|
||||
createTbReq.ctb.pTag = (uint8_t*)pTag;
|
||||
|
||||
// set tag name
|
||||
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
|
||||
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
|
||||
strcpy(tagNameStr, "group_id");
|
||||
taosArrayPush(tagName, tagNameStr);
|
||||
createTbReq.ctb.tagName = tagName;
|
||||
|
||||
int32_t code;
|
||||
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
|
||||
if (code < 0) {
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return;
|
||||
}
|
||||
|
||||
// set schema str
|
||||
schemaStr = taosMemoryMalloc(schemaLen);
|
||||
if (schemaStr == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
return;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, schemaStr, schemaLen);
|
||||
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
|
||||
if (code < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
taosArrayDestroy(tagArray);
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(schemaStr);
|
||||
return;
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
taosMemoryFree(schemaStr);
|
||||
return;
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
} else {
|
||||
if (mr.me.type != TSDB_CHILD_TABLE) {
|
||||
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
|
||||
mr.me.type);
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(ctbName);
|
||||
continue;
|
||||
}
|
||||
if (mr.me.ctbEntry.suid != suid) {
|
||||
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %ld, actual suid %ld",
|
||||
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry);
|
||||
metaReaderClear(&mr);
|
||||
taosMemoryFree(ctbName);
|
||||
continue;
|
||||
}
|
||||
|
||||
createTb = false;
|
||||
uid = mr.me.uid;
|
||||
metaReaderClear(&mr);
|
||||
|
||||
tqDebug("vgId:%d, stream write, table %s, uid %ld already exist, skip create", TD_VID(pVnode), ctbName, uid);
|
||||
|
||||
taosMemoryFreeClear(ctbName);
|
||||
}
|
||||
tEncoderClear(&encoder);
|
||||
tdDestroySVCreateTbReq(&createTbReq);
|
||||
|
||||
int32_t cap = sizeof(SSubmitReq);
|
||||
|
||||
|
@ -445,9 +481,11 @@ void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
|
|||
memcpy(blkSchema, schemaStr, schemaLen);
|
||||
blkHead->schemaLen = htonl(schemaLen);
|
||||
rowData = POINTER_SHIFT(blkSchema, schemaLen);
|
||||
} else {
|
||||
blkHead->uid = htobe64(uid);
|
||||
}
|
||||
|
||||
taosMemoryFree(schemaStr);
|
||||
taosMemoryFreeClear(schemaStr);
|
||||
|
||||
for (int32_t j = 0; j < rows; j++) {
|
||||
SRowBuilder rb = {0};
|
||||
|
|
|
@ -1506,7 +1506,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
|||
if (pReader->pMemSchema == NULL) {
|
||||
int32_t code =
|
||||
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
||||
return pReader->pMemSchema;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
} else {
|
||||
return pReader->pMemSchema;
|
||||
}
|
||||
}
|
||||
|
||||
if (pReader->pMemSchema->version == sversion) {
|
||||
|
@ -1518,9 +1523,9 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return NULL;
|
||||
} else {
|
||||
return pReader->pMemSchema;
|
||||
}
|
||||
|
||||
return pReader->pMemSchema;
|
||||
}
|
||||
|
||||
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
|
||||
|
@ -1816,7 +1821,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (minKey == key) {
|
||||
init = true;
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
|
||||
}
|
||||
|
||||
|
@ -1826,8 +1835,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
tRowMerge(&merge, &fRow1);
|
||||
} else {
|
||||
init = true;
|
||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
|
||||
}
|
||||
|
||||
|
@ -1837,7 +1850,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
} else {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, piRow, pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, piRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
@ -1847,7 +1863,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
tRowMerge(&merge, pRow);
|
||||
} else {
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
@ -1855,7 +1874,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (minKey == k.ts) {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, pRow, pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
||||
|
@ -1865,7 +1888,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
} else {
|
||||
init = true;
|
||||
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
|
||||
tRowMergerInit(&merge, piRow, pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, piRow, pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
|
||||
}
|
||||
|
@ -1876,7 +1902,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
tRowMerge(&merge, &fRow1);
|
||||
} else {
|
||||
init = true;
|
||||
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
|
||||
}
|
||||
|
@ -1884,7 +1913,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
|||
if (minKey == key) {
|
||||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
if (!init) {
|
||||
tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
} else {
|
||||
tRowMerge(&merge, &fRow);
|
||||
}
|
||||
|
@ -3046,19 +3078,30 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
|
|||
SRowMerger merge = {0};
|
||||
|
||||
// get the correct schema for data in memory
|
||||
terrno = 0;
|
||||
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(¤t), pReader, uid);
|
||||
if (pTSchema == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (pReader->pSchema == NULL) {
|
||||
pReader->pSchema = pTSchema;
|
||||
}
|
||||
|
||||
tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
||||
int32_t code = tRowMergerInit2(&merge, pReader->pSchema, ¤t, pTSchema);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
|
||||
if(pTSchema1 == NULL) {
|
||||
return terrno;
|
||||
}
|
||||
|
||||
tRowMergerAdd(&merge, pNextRow, pTSchema1);
|
||||
|
||||
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
|
||||
int32_t code = tRowMergerGetRow(&merge, pTSRow);
|
||||
code = tRowMergerGetRow(&merge, pTSRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -719,8 +719,6 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
|
|||
}
|
||||
|
||||
pMerger->version = key.version;
|
||||
|
||||
_exit:
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
SRpcMsg rspMsg = {0};
|
||||
void *pRsp = NULL;
|
||||
|
||||
if (msgNum >= MAX_META_MSG_IN_BATCH) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
qError("too many msgs %d in vnode batch meta req", msgNum);
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
|
||||
if (NULL == batchRsp) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
|
|
@ -20,6 +20,8 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define CTG_MAX_REQ_IN_BATCH 1048576
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -173,15 +173,19 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
|
|||
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncReq);
|
||||
output = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(output->tbMeta);
|
||||
taosMemoryFreeClear(output);
|
||||
|
||||
if (output) {
|
||||
taosMemoryFreeClear(output->tbMeta);
|
||||
taosMemoryFreeClear(output);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
|
|||
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
|
||||
}
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncOp);
|
||||
output = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
|||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
|
||||
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
|
||||
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
|||
taosArrayPush(pJob->pTasks, &task);
|
||||
|
||||
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
|
||||
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
|
||||
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||
|
@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
_return:
|
||||
|
@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
|
|||
default:
|
||||
ctgError("invalid reqType %d", reqType);
|
||||
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||
break;
|
||||
}
|
||||
|
||||
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
|
||||
|
@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
|
|||
int32_t baseResIdx = 0;
|
||||
for (int32_t i = 0; i < dbNum; ++i) {
|
||||
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
|
||||
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables));
|
||||
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
|
||||
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
|
||||
baseResIdx += taosArrayGetSize(pReq->pTables);
|
||||
}
|
||||
|
@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
|
|||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
|
|
@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
|
|||
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
|
||||
if (NULL == node) {
|
||||
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
|
||||
taosMemoryFree(operation->data);
|
||||
taosMemoryFree(operation);
|
||||
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
|
|||
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
|
|||
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(op->data);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
|
||||
taosMemoryFree(op);
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
_return:
|
||||
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
taosMemoryFreeClear(op->data);
|
||||
taosMemoryFreeClear(op);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
|
|||
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -834,8 +836,6 @@ _return:
|
|||
taosMemoryFree(output);
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
|
|||
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
|
|||
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
|
|||
_return:
|
||||
|
||||
tFreeSGetUserAuthRsp(pAuth);
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO
|
|||
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -923,7 +923,6 @@ _return:
|
|||
|
||||
taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(*pIndex);
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
|||
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
|
|||
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
|
||||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
|
||||
taosMemoryFree(op);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
|
|||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync
|
|||
int32_t code = 0;
|
||||
|
||||
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
|
||||
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq));
|
||||
code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
|
||||
pOutput = NULL;
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "tname.h"
|
||||
#include "tref.h"
|
||||
#include "trpc.h"
|
||||
#include "ctgRemote.h"
|
||||
|
||||
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
|
|||
int32_t num = taosArrayGetSize(pBatch->pMsgs);
|
||||
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
|
||||
|
||||
if (num >= CTG_MAX_REQ_IN_BATCH) {
|
||||
qError("too many msgs %d in one batch request", num);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
pBatchReq->header.vgId = htonl(vgId);
|
||||
pBatchReq->msgNum = htonl(num);
|
||||
offset += sizeof(SBatchReq);
|
||||
|
|
|
@ -89,8 +89,8 @@ extern "C" {
|
|||
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
|
||||
#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s"
|
||||
#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s"
|
||||
#define EXPLAIN_OFFSET_FORMAT "offset=%d"
|
||||
#define EXPLAIN_SOFFSET_FORMAT "soffset=%d"
|
||||
#define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64
|
||||
#define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64
|
||||
#define EXPLAIN_PARTITIONS_FORMAT "partitions=%d"
|
||||
|
||||
#define COMMAND_RESET_LOG "resetLog"
|
||||
|
|
|
@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
|
|||
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
|
||||
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
|
||||
if (code) {
|
||||
blockDataDestroy(pBlock);
|
||||
return code;
|
||||
}
|
||||
return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp);
|
||||
|
|
|
@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
|||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
|
||||
if (NULL == rsp) {
|
||||
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
|
||||
blockDataDestroy(pBlock);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
|
|
@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
|
|||
taosThreadMutexInit(&deleter->mutex, NULL);
|
||||
if (NULL == deleter->pDataBlocks) {
|
||||
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
destroyDataSinker((SDataSinkHandle*)deleter);
|
||||
taosMemoryFree(deleter);
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
*pHandle = deleter;
|
||||
|
|
|
@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
|
|||
int32_t code =
|
||||
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
|
||||
if (code) {
|
||||
destroyDataSinker((SDataSinkHandle*)pInserterNode);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -294,7 +294,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
|
|||
if (!exists) {
|
||||
#endif
|
||||
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
|
||||
}
|
||||
|
||||
/*}*/
|
||||
|
|
|
@ -942,8 +942,7 @@ static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
|
|||
}
|
||||
|
||||
static bool isSessionWindow(SStreamScanInfo* pInfo) {
|
||||
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||
}
|
||||
|
||||
static bool isStateWindow(SStreamScanInfo* pInfo) {
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "tsimplehash.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
#include "tdef.h"
|
||||
|
||||
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
|
||||
#define HASH_MAX_CAPACITY (1024 * 1024 * 16L)
|
||||
|
@ -110,14 +111,14 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
|
|||
}
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
|
||||
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, POINTER_BYTES * newCapacity);
|
||||
if (!pNewEntryList) {
|
||||
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
|
||||
return;
|
||||
}
|
||||
|
||||
size_t inc = newCapacity - pHashObj->capacity;
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
|
||||
memset((char *)pNewEntryList + pHashObj->capacity * POINTER_BYTES, 0, inc * sizeof(void *));
|
||||
|
||||
pHashObj->hashList = pNewEntryList;
|
||||
pHashObj->capacity = newCapacity;
|
||||
|
|
|
@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) {
|
||||
*pDst = NULL;
|
||||
qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
|
||||
*pDst = taosMemoryMalloc(metaSize);
|
||||
if (NULL == *pDst) {
|
||||
|
|
|
@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
|
|||
SBuildTableInput *pInput = input;
|
||||
STableCfgReq cfgReq = {0};
|
||||
cfgReq.header.vgId = pInput->vgId;
|
||||
strcpy(cfgReq.dbFName, pInput->dbFName);
|
||||
strcpy(cfgReq.tbName, pInput->tbName);
|
||||
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
|
||||
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
|
||||
|
||||
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
|
@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
|||
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
|
||||
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
|
||||
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
|
||||
tFreeSTableCfgRsp(out);
|
||||
taosMemoryFree(out);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
|
|
|
@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
|
||||
|
||||
if (len < 0) {
|
||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
|
||||
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
|
||||
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
|
|||
}
|
||||
|
||||
// Got data from sink
|
||||
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
|
||||
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64, len);
|
||||
|
||||
*dataLen += len;
|
||||
|
||||
|
@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
|
|||
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
|
||||
|
||||
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
|
||||
break;
|
||||
}
|
||||
|
||||
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));
|
||||
|
|
|
@ -1214,7 +1214,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
|
|||
|
||||
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u, uint32_t *uidx) {
|
||||
SFilterFieldId left, right, *pright = &right;
|
||||
int32_t type = FILTER_UNIT_DATA_TYPE(u);
|
||||
uint8_t type = FILTER_UNIT_DATA_TYPE(u);
|
||||
uint16_t flag = 0;
|
||||
|
||||
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false);
|
||||
|
@ -1644,7 +1644,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
|||
|
||||
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
|
||||
SColumnNode *refNode = (SColumnNode *)left->desc;
|
||||
if (unit->compare.optr >= 0 && unit->compare.optr <= OP_TYPE_JSON_CONTAINS) {
|
||||
if (unit->compare.optr <= OP_TYPE_JSON_CONTAINS) {
|
||||
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId,
|
||||
operatorTypeStr(unit->compare.optr));
|
||||
}
|
||||
|
@ -1664,7 +1664,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
|||
|
||||
if (unit->compare.optr2) {
|
||||
strcat(str, " && ");
|
||||
if (unit->compare.optr2 >= 0 && unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS) {
|
||||
if (unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS) {
|
||||
sprintf(str + strlen(str), "[%d][%d] %s [", refNode->dataBlockId, refNode->slotId,
|
||||
operatorTypeStr(unit->compare.optr2));
|
||||
}
|
||||
|
@ -1709,9 +1709,9 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
|||
ctx->isrange);
|
||||
if (ctx->isrange) {
|
||||
SFilterRangeNode *r = ctx->rs;
|
||||
int32_t tlen = 0;
|
||||
while (r) {
|
||||
char str[256] = {0};
|
||||
int32_t tlen = 0;
|
||||
char str[256] = {0};
|
||||
if (FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) {
|
||||
strcat(str, "(NULL)");
|
||||
} else {
|
||||
|
@ -2614,7 +2614,7 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gResNum
|
|||
int32_t usize = (int32_t)taosArrayGetSize((SArray *)colInfo->info);
|
||||
|
||||
for (int32_t n = 0; n < usize; ++n) {
|
||||
SFilterUnit *u = taosArrayGetP((SArray *)colInfo->info, n);
|
||||
SFilterUnit *u = (SFilterUnit *)taosArrayGetP((SArray *)colInfo->info, n);
|
||||
|
||||
filterAddUnitFromUnit(info, &oinfo, u, &uidx);
|
||||
filterAddUnitToGroup(&ng, uidx);
|
||||
|
|
|
@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
|
|||
break;
|
||||
case JOB_TASK_STATUS_DROP:
|
||||
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
|
||||
break;
|
||||
|
||||
default:
|
||||
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));
|
||||
|
|
|
@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
if (NULL == pJob->execRes.res) {
|
||||
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
||||
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
||||
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
|
||||
}
|
||||
|
||||
|
@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
|||
// NEVER REACH HERE
|
||||
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
break;
|
||||
}
|
||||
case TDMT_SCH_LINK_BROKEN:
|
||||
SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
|
||||
|
@ -981,7 +980,7 @@ _return:
|
|||
}
|
||||
|
||||
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
|
||||
uint32_t msgSize = 0;
|
||||
int32_t msgSize = 0;
|
||||
void *msg = NULL;
|
||||
int32_t code = 0;
|
||||
bool isCandidateAddr = false;
|
||||
|
@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
|||
default:
|
||||
SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
break;
|
||||
}
|
||||
|
||||
#if 1
|
||||
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
|
||||
msg = NULL;
|
||||
SCH_ERR_JRET(code);
|
||||
|
||||
|
|
|
@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
} else if (tsQueryPlannerTrace) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
qSubPlanToString(plan, &msg, &msgLen);
|
||||
SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
|
||||
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
|
||||
taosMemoryFree(msg);
|
||||
}
|
||||
|
@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
SArray *explainRes = NULL;
|
||||
int32_t code = 0;
|
||||
SQWMsg qwMsg = {0};
|
||||
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
|
||||
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
|
||||
|
@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
|||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &qwMsg, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
explainRes = NULL;
|
||||
}
|
||||
|
||||
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(explainRes);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
int32_t schLaunchTaskImpl(void *param) {
|
||||
|
@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
|
|||
|
||||
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||
void *pRsp = NULL;
|
||||
int32_t code = 0;
|
||||
SArray *explainRes = NULL;
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||
}
|
||||
|
||||
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||
pTask->execId, &pRsp, explainRes));
|
||||
|
||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||
SCH_ERR_RET(schHandleExplainRes(explainRes));
|
||||
explainRes = NULL;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
_return:
|
||||
|
||||
taosArrayDestroy(explainRes);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
// Note: no more error processing, handled in function internal
|
||||
|
|
|
@ -242,7 +242,7 @@ uint64_t schGenUUID(void) {
|
|||
|
||||
if (hashId == 0) {
|
||||
char uid[64] = {0};
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
|
||||
int32_t code = taosGetSystemUUID(uid, tListLen(uid) - 1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
|
||||
} else {
|
||||
|
|
|
@ -1006,6 +1006,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
|
|||
nLeft -= kLen;
|
||||
// pack partial val to local if any space left
|
||||
if (nLocal > nHeader + kLen + sizeof(SPgno)) {
|
||||
ASSERT(pVal != NULL && vLen != 0);
|
||||
memcpy(pCell + nHeader + kLen, pVal, nLocal - nHeader - kLen - sizeof(SPgno));
|
||||
nLeft -= nLocal - nHeader - kLen - sizeof(SPgno);
|
||||
}
|
||||
|
|
|
@ -105,6 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
|
|||
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
|
||||
if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) {
|
||||
// TODO: handle error
|
||||
tdbOsFree(pCache->aPage);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -267,7 +268,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
|
|||
// 4. Try a create new page
|
||||
if (!pPage) {
|
||||
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
|
||||
if (ret < 0) {
|
||||
if (ret < 0 && pPage != NULL) {
|
||||
// TODO
|
||||
ASSERT(0);
|
||||
return NULL;
|
||||
|
@ -300,8 +301,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
|
|||
pPage->pPager = pPageH->pPager;
|
||||
|
||||
memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
|
||||
tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
|
||||
TDB_PAGE_PGNO(pPageH));
|
||||
// tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
|
||||
// TDB_PAGE_PGNO(pPageH));
|
||||
tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize);
|
||||
pPage->kLen = pPageH->kLen;
|
||||
pPage->vLen = pPageH->vLen;
|
||||
|
|
|
@ -84,7 +84,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
|
|||
pPager->pCache = pCache;
|
||||
|
||||
pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
|
||||
if (pPager->fd < 0) {
|
||||
if (TDB_FD_INVALID(pPager->fd)) {
|
||||
// if (pPager->fd < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -226,7 +227,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
|
|||
|
||||
// Open the journal
|
||||
pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
|
||||
if (pPager->jfd < 0) {
|
||||
if (TDB_FD_INVALID(pPager->jfd)) {
|
||||
tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -365,7 +366,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
|
|||
|
||||
// 4, remove the journal file
|
||||
tdbOsClose(pPager->jfd);
|
||||
tdbOsRemove(pPager->jFileName);
|
||||
(void)tdbOsRemove(pPager->jFileName);
|
||||
pPager->inTran = 0;
|
||||
|
||||
return 0;
|
||||
|
@ -540,7 +541,8 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
|
|||
|
||||
ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, pPage->pageSize);
|
||||
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName,
|
||||
pPage->pageSize);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -568,7 +570,8 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
|
|||
|
||||
ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, pPage->pageSize);
|
||||
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName,
|
||||
pPage->pageSize);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -603,11 +606,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
|
||||
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -615,13 +620,16 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
|
||||
tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
|
||||
if (ret < 0) {
|
||||
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, pPager->pageSize);
|
||||
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
|
||||
pPager->pageSize);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
@ -629,6 +637,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
|
|||
if (tdbOsFSync(pPager->fd) < 0) {
|
||||
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
tdbOsFree(pageBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -106,11 +106,13 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
|
|||
// pTb->pBt
|
||||
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt));
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ret = tdbPagerRestore(pPager, pTb->pBt);
|
||||
if (ret < 0) {
|
||||
tdbOsFree(pTb);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ extern "C" {
|
|||
/* file */
|
||||
typedef TdFilePtr tdb_fd_t;
|
||||
|
||||
#define TDB_FD_INVALID(fd) (fd == NULL)
|
||||
|
||||
#define TDB_O_CREAT TD_FILE_CREATE
|
||||
#define TDB_O_WRITE TD_FILE_WRITE
|
||||
#define TDB_O_READ TD_FILE_READ
|
||||
|
@ -141,4 +143,4 @@ typedef pthread_mutex_t tdb_mutex_t;
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TDB_OS_H_*/
|
||||
#endif /*_TDB_OS_H_*/
|
||||
|
|
|
@ -128,10 +128,10 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
|
|||
static const int HEADSIZE = sizeof(STransMsgHead);
|
||||
|
||||
SConnBuffer* p = connBuf;
|
||||
if (p->left != 0) {
|
||||
if (p->left != 0 || p->total <= 0) {
|
||||
return -1;
|
||||
}
|
||||
int total = connBuf->total;
|
||||
int total = p->total;
|
||||
if (total >= HEADSIZE && !p->invalid) {
|
||||
*buf = taosMemoryCalloc(1, total);
|
||||
memcpy(*buf, p->buf, total);
|
||||
|
|
|
@ -41,7 +41,7 @@ class TDTestCase:
|
|||
ifcheckdata = 0
|
||||
ifManualCommit = 1
|
||||
groupId = 'group.id:cgrp1'
|
||||
autoCommit = 'enable.auto.commit:false'
|
||||
autoCommit = 'enable.auto.commit:true'
|
||||
autoCommitInterval = 'auto.commit.interval.ms:1000'
|
||||
autoOffset = 'auto.offset.reset:earliest'
|
||||
|
||||
|
@ -87,8 +87,9 @@ class TDTestCase:
|
|||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
|
||||
|
||||
tdLog.info("After waiting for a period of time, drop one stable")
|
||||
time.sleep(3)
|
||||
tdLog.info("After waiting for a commit notify, drop one stable")
|
||||
#time.sleep(3)
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
|
||||
|
||||
tdLog.info("wait result from consumer, then check it")
|
||||
|
|
|
@ -42,7 +42,7 @@ class TDTestCase:
|
|||
'rowsPerTbl': 1000,
|
||||
'batchNum': 500,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'pollDelay': 20,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
@ -87,7 +87,7 @@ class TDTestCase:
|
|||
'rowsPerTbl': 1000,
|
||||
'batchNum': 500,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'pollDelay': 20,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
|
|
@ -37,7 +37,7 @@ class TDTestCase:
|
|||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 10,
|
||||
'ctbNum': 100,
|
||||
'rowsPerTbl': 4000,
|
||||
'batchNum': 15,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
|
@ -124,7 +124,8 @@ class TDTestCase:
|
|||
tdLog.info("async insert data")
|
||||
pThread = tmqCom.asyncInsertData(paraDict)
|
||||
|
||||
time.sleep(5)
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim();
|
||||
#time.sleep(5)
|
||||
tdLog.info("check show consumers")
|
||||
tdSql.query("show consumers")
|
||||
# tdLog.info(tdSql.queryResult)
|
||||
|
|
|
@ -22,7 +22,7 @@
|
|||
#define TAOS_CONSOLE_PROMPT_HEADER "taos> "
|
||||
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
|
||||
|
||||
#define SHELL_HOST "The auth string to use when connecting to the server."
|
||||
#define SHELL_HOST "TDengine server FQDN to connect. The default host is localhost."
|
||||
#define SHELL_PORT "The TCP/IP port number to use for the connection."
|
||||
#define SHELL_USER "The user name to use when connecting to the server."
|
||||
#define SHELL_PASSWORD "The password to use when connecting to the server."
|
||||
|
|
Loading…
Reference in New Issue