Merge remote-tracking branch 'origin/3.0' into enh/TD-19463_2

This commit is contained in:
Shengliang Guan 2022-10-18 09:37:33 +08:00
commit a609f0efb5
68 changed files with 694 additions and 321 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 2849aa4
GIT_TAG 4d02980
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -212,6 +212,7 @@ tmq_list_t* build_topic_list() {
tmq_list_t* topicList = tmq_list_new();
int32_t code = tmq_list_append(topicList, "topicname");
if (code) {
tmq_list_destroy(topicList);
return NULL;
}
return topicList;

View File

@ -1902,7 +1902,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
if (pRebInfo == NULL) {
return NULL;
}
strcpy(pRebInfo->key, key);
tstrncpy(pRebInfo->key, key, TSDB_SUBSCRIBE_KEY_LEN);
pRebInfo->lostConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->lostConsumers == NULL) {
goto _err;

View File

@ -239,6 +239,7 @@ typedef enum ELogicConditionType {
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024
#define TSDB_MAX_COL_TAG_NUM (TSDB_MAX_COLUMNS + TSDB_MAX_TAGS)
#define TSDB_MAX_JSON_TAG_LEN 16384
#define TSDB_MAX_JSON_KEY_LEN 256
@ -496,6 +497,8 @@ enum {
#define MAX_NUM_STR_SIZE 40
#define MAX_META_MSG_IN_BATCH 1048576
#ifdef __cplusplus
}
#endif

View File

@ -168,7 +168,7 @@ function install_bin() {
${csudo}cp -r ${binary_dir}/build/bin/${clientName} ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/taosBenchmark ] && ${csudo}cp -r ${binary_dir}/build/bin/taosBenchmark ${install_main_dir}/bin || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
[ -f ${binary_dir}/build/bin/taosdump ] && ${csudo}cp -r ${binary_dir}/build/bin/taosdump ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || :
@ -182,21 +182,21 @@ function install_bin() {
${csudo}chmod 0555 ${install_main_dir}/bin/*
#Make link
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter || :
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo}ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo || :
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx || :
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo}ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor || :
[ -x ${install_main_dir}/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosdemo ] && ${csudo}ln -s ${install_main_dir}/bin/taosdemo ${bin_link_dir}/taosdemo > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/perfMonitor ] && ${csudo}ln -s ${install_main_dir}/bin/perfMonitor ${bin_link_dir}/perfMonitor > /dev/null 2>&1 || :
[ -x ${install_main_dir}/set_core.sh ] && ${csudo}ln -s ${install_main_dir}/bin/set_core.sh ${bin_link_dir}/set_core > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} > /dev/null 2>&1 || :
else
${csudo}cp -r ${binary_dir}/build/bin/${clientName} ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/taosBenchmark ] && ${csudo}cp -r ${binary_dir}/build/bin/taosBenchmark ${install_main_dir}/bin || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
[ -f ${binary_dir}/build/bin/taosdump ] && ${csudo}cp -r ${binary_dir}/build/bin/taosdump ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/taosadapter ] && ${csudo}cp -r ${binary_dir}/build/bin/taosadapter ${install_main_dir}/bin || :
[ -f ${binary_dir}/build/bin/udfd ] && ${csudo}cp -r ${binary_dir}/build/bin/udfd ${install_main_dir}/bin || :
@ -206,14 +206,14 @@ function install_bin() {
${csudo}cp -r ${script_dir}/remove.sh ${install_main_dir}/bin || :
${csudo}chmod 0555 ${install_main_dir}/bin/*
#Make link
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} || :
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} || :
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter || :
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo || :
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} || :
[ -x ${install_main_dir}/bin/${clientName} ] && ${csudo}ln -s ${install_main_dir}/bin/${clientName} ${bin_link_dir}/${clientName} > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/${serverName} ] && ${csudo}ln -s ${install_main_dir}/bin/${serverName} ${bin_link_dir}/${serverName} > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosadapter ] && ${csudo}ln -s ${install_main_dir}/bin/taosadapter ${bin_link_dir}/taosadapter > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/udfd ] && ${csudo}ln -s ${install_main_dir}/bin/udfd ${bin_link_dir}/udfd > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosdump ] && ${csudo}ln -s ${install_main_dir}/bin/taosdump ${bin_link_dir}/taosdump > /dev/null 2>&1 || :
[ -f ${install_main_dir}/bin/taosBenchmark ] && ${csudo}ln -sf ${install_main_dir}/bin/taosBenchmark ${install_main_dir}/bin/taosdemo > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/taosx ] && ${csudo}ln -s ${install_main_dir}/bin/taosx ${bin_link_dir}/taosx > /dev/null 2>&1 || :
[ -x ${install_main_dir}/bin/remove.sh ] && ${csudo}ln -s ${install_main_dir}/bin/remove.sh ${bin_link_dir}/${uninstallScript} > /dev/null 2>&1 || :
fi
}
@ -238,7 +238,7 @@ function install_jemalloc() {
if [ -f "${binary_dir}/build/lib/libjemalloc.so.2" ]; then
${csudo}/usr/bin/install -c -d /usr/local/lib
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.so.2 /usr/local/lib
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so
${csudo}ln -sf libjemalloc.so.2 /usr/local/lib/libjemalloc.so > /dev/null 2>&1
${csudo}/usr/bin/install -c -d /usr/local/lib
[ -f ${binary_dir}/build/lib/libjemalloc.a ] &&
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/lib/libjemalloc.a /usr/local/lib
@ -274,8 +274,8 @@ function install_avro() {
if [ -f "${binary_dir}/build/$1/libavro.so.23.0.0" ] && [ -d /usr/local/$1 ]; then
${csudo}/usr/bin/install -c -d /usr/local/$1
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/$1/libavro.so.23.0.0 /usr/local/$1
${csudo}ln -sf libavro.so.23.0.0 /usr/local/$1/libavro.so.23
${csudo}ln -sf libavro.so.23 /usr/local/$1/libavro.so
${csudo}ln -sf libavro.so.23.0.0 /usr/local/$1/libavro.so.23 > /dev/null 2>&1
${csudo}ln -sf libavro.so.23 /usr/local/$1/libavro.so > /dev/null 2>&1
${csudo}/usr/bin/install -c -d /usr/local/$1
[ -f ${binary_dir}/build/$1/libavro.a ] &&
${csudo}/usr/bin/install -c -m 755 ${binary_dir}/build/$1/libavro.a /usr/local/$1
@ -304,11 +304,11 @@ function install_lib() {
${install_main_dir}/driver &&
${csudo}chmod 777 ${install_main_dir}/driver/libtaos.so.${verNumber}
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib_link_dir}/libtaos.so.1 > /dev/null 2>&1
${csudo}ln -sf ${lib_link_dir}/libtaos.so.1 ${lib_link_dir}/libtaos.so > /dev/null 2>&1
if [ -d "${lib64_link_dir}" ]; then
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so
${csudo}ln -sf ${install_main_dir}/driver/libtaos.* ${lib64_link_dir}/libtaos.so.1 > /dev/null 2>&1
${csudo}ln -sf ${lib64_link_dir}/libtaos.so.1 ${lib64_link_dir}/libtaos.so > /dev/null 2>&1
fi
if [ -f ${binary_dir}/build/lib/libtaosws.so ]; then
@ -316,23 +316,23 @@ function install_lib() {
${install_main_dir}/driver &&
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.so ||:
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so || :
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.so ${lib_link_dir}/libtaosws.so > /dev/null 2>&1 || :
fi
else
${csudo}cp -Rf ${binary_dir}/build/lib/libtaos.${verNumber}.dylib \
${install_main_dir}/driver && ${csudo}chmod 777 ${install_main_dir}/driver/*
${csudo}ln -sf ${install_main_dir}/driver/libtaos.${verNumber}.dylib \
${lib_link_dir}/libtaos.1.dylib || :
${lib_link_dir}/libtaos.1.dylib > /dev/null 2>&1 || :
${csudo}ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib || :
${csudo}ln -sf ${lib_link_dir}/libtaos.1.dylib ${lib_link_dir}/libtaos.dylib > /dev/null 2>&1 || :
if [ -f ${binary_dir}/build/lib/libtaosws.dylib ]; then
${csudo}cp ${binary_dir}/build/lib/libtaosws.dylib \
${install_main_dir}/driver &&
${csudo}chmod 777 ${install_main_dir}/driver/libtaosws.dylib ||:
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.dylib ${lib_link_dir}/libtaosws.dylib || :
${csudo}ln -sf ${install_main_dir}/driver/libtaosws.dylib ${lib_link_dir}/libtaosws.dylib > /dev/null 2>&1 || :
fi
fi
@ -346,6 +346,7 @@ function install_lib() {
}
function install_header() {
${csudo}mkdir -p ${inc_link_dir}
${csudo}rm -f ${inc_link_dir}/taos.h ${inc_link_dir}/taosdef.h ${inc_link_dir}/taoserror.h ${inc_link_dir}/taosudf.h || :
[ -f ${inc_link_dir}/taosws.h ] && ${csudo}rm -f ${inc_link_dir}/taosws.h ||:
${csudo}cp -f ${source_dir}/include/client/taos.h ${source_dir}/include/common/taosdef.h ${source_dir}/include/util/taoserror.h ${source_dir}/include/libs/function/taosudf.h \
@ -353,13 +354,13 @@ function install_header() {
if [ -f ${binary_dir}/build/include/taosws.h ]; then
${csudo}cp -f ${binary_dir}/build/include/taosws.h ${install_main_dir}/include && ${csudo}chmod 644 ${install_main_dir}/include/taosws.h ||:
${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h ||:
${csudo}ln -sf ${install_main_dir}/include/taosws.h ${inc_link_dir}/taosws.h > /dev/null 2>&1 ||:
fi
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h
${csudo}ln -s ${install_main_dir}/include/taos.h ${inc_link_dir}/taos.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taosdef.h ${inc_link_dir}/taosdef.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taoserror.h ${inc_link_dir}/taoserror.h > /dev/null 2>&1
${csudo}ln -s ${install_main_dir}/include/taosudf.h ${inc_link_dir}/taosudf.h > /dev/null 2>&1
${csudo}chmod 644 ${install_main_dir}/include/*
}
@ -374,7 +375,7 @@ function install_config() {
${csudo}cp -f ${script_dir}/../cfg/${configFile} \
${cfg_install_dir}/${configFile}.${verNumber}
${csudo}ln -s ${cfg_install_dir}/${configFile} \
${install_main_dir}/cfg/${configFile}
${install_main_dir}/cfg/${configFile} > /dev/null 2>&1
else
${csudo}cp -f ${script_dir}/../cfg/${configFile} \
${cfg_install_dir}/${configFile}.${verNumber}
@ -395,7 +396,7 @@ function install_taosadapter_config() {
${cfg_install_dir}/taosadapter.toml.${verNumber} || :
[ -f ${cfg_install_dir}/taosadapter.toml ] &&
${csudo}ln -s ${cfg_install_dir}/taosadapter.toml \
${install_main_dir}/cfg/taosadapter.toml || :
${install_main_dir}/cfg/taosadapter.toml > /dev/null 2>&1 || :
else
if [ -f "${binary_dir}/test/cfg/taosadapter.toml" ]; then
${csudo}cp -f ${binary_dir}/test/cfg/taosadapter.toml \
@ -408,12 +409,12 @@ function install_taosadapter_config() {
function install_log() {
${csudo}rm -rf ${log_dir} || :
${csudo}mkdir -p ${log_dir} && ${csudo}chmod 777 ${log_dir}
${csudo}ln -s ${log_dir} ${install_main_dir}/log
${csudo}ln -s ${log_dir} ${install_main_dir}/log > /dev/null 2>&1
}
function install_data() {
${csudo}mkdir -p ${data_dir} && ${csudo}chmod 777 ${data_dir}
${csudo}ln -s ${data_dir} ${install_main_dir}/data
${csudo}ln -s ${data_dir} ${install_main_dir}/data > /dev/null 2>&1
}
function install_connector() {
@ -533,7 +534,7 @@ function install_taosadapter_service() {
function install_service_on_launchctl() {
${csudouser}launchctl unload -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
${csudo}cp ${script_dir}/com.taosdata.taosd.plist /Library/LaunchDaemons/com.taosdata.taosd.plist
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist || :
${csudouser}launchctl load -w /Library/LaunchDaemons/com.taosdata.taosd.plist > /dev/null 2>&1 || :
}
function install_service() {

View File

@ -173,7 +173,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes;
pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes;
pTscObj->connId = pRsp->query->connId;
tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
tscTrace("conn %u hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes,
pTscObj->pAppInfo->totalDnodes);
if (pRsp->query->killRid) {
@ -440,6 +440,7 @@ int32_t hbGetExpiredUserInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
}
if (userNum <= 0) {
taosMemoryFree(users);
return TSDB_CODE_SUCCESS;
}
@ -476,6 +477,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
}
if (dbNum <= 0) {
taosMemoryFree(dbs);
return TSDB_CODE_SUCCESS;
}
@ -514,6 +516,7 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
}
if (stbNum <= 0) {
taosMemoryFree(stbs);
return TSDB_CODE_SUCCESS;
}

View File

@ -186,7 +186,7 @@ int32_t buildRequest(uint64_t connId, const char* sql, int sqlLen, void* param,
STscObj* pTscObj = (*pRequest)->pTscObj;
if (taosHashPut(pTscObj->pRequests, &(*pRequest)->self, sizeof((*pRequest)->self), &(*pRequest)->self,
sizeof((*pRequest)->self))) {
tscError("%d failed to add to request container, reqId:0x%" PRIx64 ", conn:%d, %s", (*pRequest)->self,
tscError("%" PRIx64 " failed to add to request container, reqId:0x%" PRIu64 ", conn:%" PRIx64 ", %s", (*pRequest)->self,
(*pRequest)->requestId, pTscObj->id, sql);
taosMemoryFree(param);
@ -371,7 +371,7 @@ int32_t updateQnodeList(SAppInstInfo* pInfo, SArray* pNodeList) {
pInfo->pQnodeList = taosArrayDup(pNodeList);
taosArraySort(pInfo->pQnodeList, compareQueryNodeLoad);
tscDebug("QnodeList updated in cluster 0x%" PRIx64 ", num:%d", pInfo->clusterId,
taosArrayGetSize(pInfo->pQnodeList));
(int)taosArrayGetSize(pInfo->pQnodeList));
}
taosThreadMutexUnlock(&pInfo->qnodeMutex);

View File

@ -410,6 +410,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0};
char* string = NULL;
cJSON* json = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -419,7 +420,7 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
@ -524,6 +525,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVDropStbReq req = {0};
char* string = NULL;
cJSON* json = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -533,7 +535,7 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
@ -556,6 +558,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVDropTbBatchReq req = {0};
char* string = NULL;
cJSON* json = NULL;
// decode
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -565,7 +568,7 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
goto _exit;
}
cJSON* json = cJSON_CreateObject();
json = cJSON_CreateObject();
if (json == NULL) {
goto _exit;
}
@ -684,7 +687,7 @@ end:
static int32_t taosDropStb(TAOS* taos, void* meta, int32_t metaLen) {
SVDropStbReq req = {0};
SDecoder coder;
SDecoder coder = {0};
SMDropStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
SRequestObj* pRequest = NULL;
@ -1212,6 +1215,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t code = TSDB_CODE_SUCCESS;
STableMeta* pTableMeta = NULL;
SQuery* pQuery = NULL;
SSubmitReq* subReq = NULL;
SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT);
if (!pRequest) {
@ -1228,8 +1232,8 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
}
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
strcpy(pName.dbname, pRequest->pDb);
strcpy(pName.tname, tbname);
tstrncpy(pName.dbname, pRequest->pDb, sizeof(pName.dbname));
tstrncpy(pName.tname, tbname, sizeof(pName.tname));
struct SCatalog* pCatalog = NULL;
code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
@ -1278,7 +1282,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize;
int32_t totalLen = sizeof(SSubmitReq) + submitLen;
SSubmitReq* subReq = taosMemoryCalloc(1, totalLen);
subReq = taosMemoryCalloc(1, totalLen);
SSubmitBlk* blk = POINTER_SHIFT(subReq, sizeof(SSubmitReq));
void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk));
STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen);
@ -1352,6 +1356,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
if (NULL == pQuery) {
uError("create SQuery error");
code = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(subReq);
goto end;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
@ -1390,6 +1395,7 @@ int taos_write_raw_block(TAOS* taos, int rows, char* pData, const char* tbname)
end:
taosMemoryFreeClear(pTableMeta);
qDestroyQuery(pQuery);
taosMemoryFree(subReq);
return code;
}

View File

@ -299,6 +299,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
for (; i < taosArrayGetSize(cols); i++) {
SSmlKv *kv = (SSmlKv *)taosArrayGetP(cols, i);
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
taosHashCleanup(hashTmp);
return -1;
}
}
@ -430,7 +431,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
STableMeta *pTableMeta = NULL;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
SRequestConnInfo conn = {0};
conn.pTrans = info->taos->pAppInfo->pTransporter;
@ -874,7 +875,8 @@ static int32_t smlParseTS(SSmlHandle *info, const char *data, int32_t len, SArra
kv->i = ts;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if (cols) taosArrayPush(cols, &kv);
taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
@ -1009,6 +1011,7 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTableName, SHashObj *dumplicateKey,
SSmlMsgBuf *msg) {
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName);
while (*sql != '\0') {
@ -1082,7 +1085,7 @@ static int32_t smlParseTelnetTags(const char *data, SArray *cols, char *childTab
kv->length = valueLen;
kv->type = TSDB_DATA_TYPE_NCHAR;
if (cols) taosArrayPush(cols, &kv);
taosArrayPush(cols, &kv);
}
return TSDB_CODE_SUCCESS;
@ -1370,8 +1373,14 @@ static int32_t smlKvTimeHashCompare(const void *key1, const void *key2) {
SHashObj *s2 = *(SHashObj **)key2;
SSmlKv *kv1 = *(SSmlKv **)taosHashGet(s1, TS, TS_LEN);
SSmlKv *kv2 = *(SSmlKv **)taosHashGet(s2, TS, TS_LEN);
ASSERT(kv1->type == TSDB_DATA_TYPE_TIMESTAMP);
ASSERT(kv2->type == TSDB_DATA_TYPE_TIMESTAMP);
if(!kv1 || kv1->type != TSDB_DATA_TYPE_TIMESTAMP){
uError("smlKvTimeHashCompare kv1");
return -1;
}
if(!kv2 || kv2->type != TSDB_DATA_TYPE_TIMESTAMP){
uError("smlKvTimeHashCompare kv2");
return -1;
}
if (kv1->i < kv2->i) {
return -1;
} else if (kv1->i > kv2->i) {
@ -1735,7 +1744,7 @@ static int32_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root, SArray *cols) {
kv->i = tsVal;
kv->type = TSDB_DATA_TYPE_TIMESTAMP;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
if (cols) taosArrayPush(cols, &kv);
taosArrayPush(cols, &kv);
return TSDB_CODE_SUCCESS;
}
@ -1932,6 +1941,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
}
static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if(!cols) return TSDB_CODE_OUT_OF_MEMORY;
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
@ -1941,7 +1951,7 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
if (!kv) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (cols) taosArrayPush(cols, &kv);
taosArrayPush(cols, &kv);
kv->key = VALUE;
kv->keyLen = VALUE_LEN;
@ -1955,7 +1965,9 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey,
SSmlMsgBuf *msg) {
int32_t ret = TSDB_CODE_SUCCESS;
if (!pKVs){
return TSDB_CODE_OUT_OF_MEMORY;
}
cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) {
return TSDB_CODE_TSC_INVALID_JSON;
@ -1985,14 +1997,14 @@ static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableN
return TSDB_CODE_TSC_INVALID_JSON;
}
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
tstrncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
continue;
}
// add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if (!kv) return TSDB_CODE_OUT_OF_MEMORY;
if (pKVs) taosArrayPush(pKVs, &kv);
taosArrayPush(pKVs, &kv);
// key
kv->keyLen = keyLen;
@ -2103,6 +2115,8 @@ static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql) {
if (!oneTable) {
tinfo = smlBuildTableInfo();
if (!tinfo) {
smlDestroyCols(cols);
if (info->dataFormat) taosArrayDestroy(cols);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES);
@ -2295,7 +2309,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
SSmlTableInfo *tableData = *oneTable;
SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}};
strcpy(pName.dbname, info->pRequest->pDb);
tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname));
memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName));
SRequestConnInfo conn = {0};

View File

@ -201,6 +201,9 @@ int32_t stmtCacheBlock(STscStmt* pStmt) {
}
STableDataBlocks** pSrc = taosHashGet(pStmt->exec.pBlockHash, pStmt->bInfo.tbFName, strlen(pStmt->bInfo.tbFName));
if(!pSrc){
return TSDB_CODE_OUT_OF_MEMORY;
}
STableDataBlocks* pDst = NULL;
STMT_ERR_RET(qCloneStmtDataBlock(&pDst, *pSrc));

View File

@ -233,12 +233,12 @@ void tmq_conf_destroy(tmq_conf_t* conf) {
tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
if (strcmp(key, "group.id") == 0) {
strcpy(conf->groupId, value);
tstrncpy(conf->groupId, value, TSDB_CGROUP_LEN);
return TMQ_CONF_OK;
}
if (strcmp(key, "client.id") == 0) {
strcpy(conf->clientId, value);
tstrncpy(conf->clientId, value, 256);
return TMQ_CONF_OK;
}
@ -452,7 +452,6 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
int32_t code;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
ASSERT(0);
return -1;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
@ -464,15 +463,22 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
SEncoder encoder;
tEncoderInit(&encoder, abuf, len);
tEncodeSTqOffset(&encoder, pOffset);
tEncoderClear(&encoder);
// build param
SMqCommitCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam));
if (pParam == NULL) {
taosMemoryFree(buf);
return -1;
}
pParam->params = pParamSet;
pParam->pOffset = pOffset;
// build send info
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (pMsgSendInfo == NULL) {
taosMemoryFree(buf);
taosMemoryFree(pParam);
return -1;
}
pMsgSendInfo->msgInfo = (SDataBuf){
@ -547,6 +553,8 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
goto FAIL;
}
goto HANDLE_RSP;
@ -565,6 +573,7 @@ HANDLE_RSP:
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return code;
} else {
code = 0;
@ -587,7 +596,14 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = TSDB_CODE_OUT_OF_MEMORY;
if (async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam);
}
}
return -1;
}
@ -642,16 +658,6 @@ int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
} else {
code = 0;
}
if (code != 0 && async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam);
}
}
#if 0
@ -709,6 +715,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
taosMemoryFree(param);
return;
}
int64_t consumerId = tmq->consumerId;
@ -721,6 +728,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pReq);
goto OVER;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
@ -870,8 +878,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
return NULL;
}
@ -939,10 +946,9 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
return NULL;
}
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
if (pTmq->hbBgEnable) {
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
@ -1061,9 +1067,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code = 0;
FAIL:
if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree);
if (code != 0 && buf) {
taosMemoryFree(buf);
}
taosMemoryFree(buf);
return code;
}
@ -1101,7 +1106,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
tscWarn("msg discard from vgId:%d, epoch %d since out of memory", vgId, epoch);
goto CREATE_MSG_FAIL;
}

View File

@ -602,7 +602,7 @@ _exit:
}
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
int32_t n;
int32_t n = 0;
TSROW_LEN(pRow, n);
if (p) {
@ -613,7 +613,7 @@ int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow) {
}
int32_t tGetTSRow(uint8_t *p, STSRow2 **ppRow) {
int32_t n;
int32_t n = 0;
*ppRow = (STSRow2 *)p;
TSROW_LEN(*ppRow, n);
@ -916,6 +916,10 @@ char *tTagValToData(const STagVal *value, bool isJson) {
}
bool tTagGet(const STag *pTag, STagVal *pTagVal) {
if(!pTag || !pTagVal){
return false;
}
int16_t lidx = 0;
int16_t ridx = pTag->nTag - 1;
int16_t midx;

View File

@ -795,7 +795,7 @@ int64_t taosTimeTruncate(int64_t t, const SInterval* pInterval, int32_t precisio
// not enough time range
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {
end = taosTimeAdd(start, pInterval->interval, pInterval->intervalUnit, precision) - 1;
while (end < t && ((start + pInterval->sliding) <= INT64_MAX)) { // move forward to the correct time window
while (end < t) { // move forward to the correct time window
start += pInterval->sliding;
if (start < 0 || INT64_MAX - start > pInterval->interval - 1) {

View File

@ -366,6 +366,7 @@ static int32_t toBinary(SVariant *pVariant, char **pDest, int32_t *pDestSize) {
}
if (pBuf != NULL) {
taosMemoryFree(pVariant->pz);
*pDest = pBuf;
}
@ -688,7 +689,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
case TSDB_DATA_TYPE_BIGINT: {
if (convertToInteger(pVariant, &result, type, true, false, converted) < 0) {
SET_EXT_INFO(converted, (int64_t)result, INT64_MIN + 1, INT64_MAX, extInfo);
SET_EXT_INFO(converted, result, INT64_MIN + 1, INT64_MAX, extInfo);
return -1;
}
*((int64_t *)payload) = (int64_t)result;
@ -697,7 +698,7 @@ int32_t tVariantDumpEx(SVariant *pVariant, char *payload, int16_t type, bool inc
case TSDB_DATA_TYPE_UBIGINT: {
if (convertToInteger(pVariant, &result, type, false, false, converted) < 0) {
SET_EXT_INFO(converted, (uint64_t)result, 0, UINT64_MAX - 1, extInfo);
SET_EXT_INFO(converted, result, 0, UINT64_MAX - 1, extInfo);
return -1;
}
*((uint64_t *)payload) = (uint64_t)result;

View File

@ -22,6 +22,8 @@
extern "C" {
#endif
#define MAX_META_MSG_IN_BATCH 1048576
int32_t mndInitQuery(SMnode *pMnode);
void mndCleanupQuery(SMnode *pMnode);

View File

@ -369,7 +369,7 @@ static void mndReleaseApp(SMnode *pMnode, SAppObj *pApp) {
taosCacheRelease(pMgmt->appCache, (void **)&pApp, false);
}
void *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
SAppObj *mndGetNextApp(SMnode *pMnode, SCacheIter *pIter) {
SAppObj *pApp = NULL;
bool hasNext = taosCacheIterNext(pIter);
if (hasNext) {

View File

@ -77,6 +77,12 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
void *pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
mError("too many msgs %d in mnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY;
@ -106,6 +112,7 @@ int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
taosArrayDestroy(batchRsp);
return -1;
}

View File

@ -115,7 +115,6 @@ typedef struct {
typedef struct {
SMqDataRsp dataRsp;
SMqRspHead rspHead;
char subKey[TSDB_SUBSCRIBE_KEY_LEN];
SRpcHandleInfo pInfo;
} STqPushEntry;
@ -183,6 +182,7 @@ int32_t tqOffsetCommitFile(STqOffsetStore* pStore);
// tqSink
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data);
// tqOffset
char* tqOffsetBuildFName(const char* path, int32_t ver);

View File

@ -675,7 +675,7 @@ int32_t metaGetTbTSchemaEx(SMeta *pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sv
SSchemaWrapper *pSchemaWrapper = &schema;
tDecoderInit(&dc, pData, nData);
tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
(void)tDecodeSSchemaWrapper(&dc, pSchemaWrapper);
tDecoderClear(&dc);
tdbFree(pData);

View File

@ -944,6 +944,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaUpdateTagIdx(pMeta, &ctbEntry);
}
ASSERT(ctbEntry.ctbEntry.pTags);
SCtbIdxKey ctbIdxKey = {.suid = ctbEntry.ctbEntry.suid, .uid = uid};
tdbTbUpsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), ctbEntry.ctbEntry.pTags,
((STag *)(ctbEntry.ctbEntry.pTags))->len, &pMeta->txn);
@ -952,7 +953,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
tDecoderClear(&dc1);
tDecoderClear(&dc2);
if (ctbEntry.ctbEntry.pTags) taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
if (ctbEntry.pBuf) taosMemoryFree(ctbEntry.pBuf);
if (stbEntry.pBuf) tdbFree(stbEntry.pBuf);
tdbTbcClose(pTbDbc);
@ -1202,8 +1203,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
SMetaEntry stbEntry = {0};
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void *pTagData = NULL; //
const SSchema *pTagColumn;
const void *pTagData = NULL;
int32_t nTagData = 0;
SDecoder dc = {0};
int32_t ret = 0;

View File

@ -192,7 +192,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
return -1;
}
memcpy(buf, &pPushEntry->rspHead, sizeof(SMqRspHead));
memcpy(buf, &pPushEntry->dataRsp.head, sizeof(SMqRspHead));
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
@ -215,7 +215,7 @@ int32_t tqPushDataRsp(STQ* pTq, STqPushEntry* pPushEntry) {
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vgId:%d, from consumer:%" PRId64 ", (epoch %d) push rsp, block num: %d, reqOffset:%s, rspOffset:%s",
TD_VID(pTq->pVnode), pPushEntry->rspHead.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
TD_VID(pTq->pVnode), pRsp->head.consumerId, pRsp->head.epoch, pRsp->blockNum, buf1, buf2);
return 0;
}
@ -560,9 +560,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
memcpy(pPushEntry->subKey, pHandle->subKey, TSDB_SUBSCRIBE_KEY_LEN);
dataRsp.withTbName = 0;
memcpy(&pPushEntry->dataRsp, &dataRsp, sizeof(SMqDataRsp));
pPushEntry->rspHead.consumerId = consumerId;
pPushEntry->rspHead.epoch = reqEpoch;
pPushEntry->rspHead.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
pPushEntry->dataRsp.head.consumerId = consumerId;
pPushEntry->dataRsp.head.epoch = reqEpoch;
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
tqDebug("tmq poll: consumer %ld, subkey %s, vg %d save handle to push mgr", consumerId, pHandle->subKey,
TD_VID(pTq->pVnode));
@ -924,7 +924,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) {
pTask->smaSink.smaSink = smaHandleRes;
} else if (pTask->outputType == TASK_OUTPUT__TABLE) {
pTask->tbSink.vnode = pTq->pVnode;
pTask->tbSink.tbSinkFunc = tqTableSink;
pTask->tbSink.tbSinkFunc = tqTableSink1;
ASSERT(pTask->tbSink.pSchemaWrapper);
ASSERT(pTask->tbSink.pSchemaWrapper->pSchema);

View File

@ -284,6 +284,250 @@ SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchem
return ret;
}
void tqTableSink1(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pBlocks = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;
int64_t suid = pTask->tbSink.stbUid;
char* stbFullName = pTask->tbSink.stbFullName;
STSchema* pTSchema = pTask->tbSink.pTSchema;
SSchemaWrapper* pSchemaWrapper = pTask->tbSink.pSchemaWrapper;
int32_t blockSz = taosArrayGetSize(pBlocks);
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if (!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return;
}
tqDebug("vgId:%d, task %d write into table, block num: %d", TD_VID(pVnode), pTask->taskId, blockSz);
for (int32_t i = 0; i < blockSz; i++) {
bool createTb = true;
SSDataBlock* pDataBlock = taosArrayGet(pBlocks, i);
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
SBatchDeleteReq deleteReq = {0};
deleteReq.deleteReqs = taosArrayInit(0, sizeof(SSingleDeleteReq));
deleteReq.suid = suid;
tqBuildDeleteReq(pVnode, stbFullName, pDataBlock, &deleteReq);
int32_t len;
int32_t code;
tEncodeSize(tEncodeSBatchDeleteReq, &deleteReq, len, code);
if (code < 0) {
//
ASSERT(0);
}
SEncoder encoder;
void* serializedDeleteReq = rpcMallocCont(len + sizeof(SMsgHead));
void* abuf = POINTER_SHIFT(serializedDeleteReq, sizeof(SMsgHead));
tEncoderInit(&encoder, abuf, len);
tEncodeSBatchDeleteReq(&encoder, &deleteReq);
tEncoderClear(&encoder);
taosArrayDestroy(deleteReq.deleteReqs);
((SMsgHead*)serializedDeleteReq)->vgId = pVnode->config.vgId;
SRpcMsg msg = {
.msgType = TDMT_VND_BATCH_DEL,
.pCont = serializedDeleteReq,
.contLen = len + sizeof(SMsgHead),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(serializedDeleteReq);
tqDebug("failed to put delete req into write-queue since %s", terrstr());
}
} else {
char* ctbName = NULL;
// set child table name
if (pDataBlock->info.parTbName[0]) {
ctbName = strdup(pDataBlock->info.parTbName);
} else {
ctbName = buildCtbNameByGroupId(stbFullName, pDataBlock->info.groupId);
}
int32_t schemaLen = 0;
void* schemaStr = NULL;
int64_t uid = 0;
SMetaReader mr = {0};
metaReaderInit(&mr, pVnode->pMeta, 0);
if (metaGetTableEntryByName(&mr, ctbName) < 0) {
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName);
SVCreateTbReq createTbReq = {0};
// set const
createTbReq.flags = 0;
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
// set super table name
SName name = {0};
tNameFromString(&name, stbFullName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
createTbReq.ctb.stbName = strdup((char*)tNameGetTableName(&name)); // strdup(stbFullName);
createTbReq.name = ctbName;
ctbName = NULL;
// set tag content
taosArrayClear(tagArray);
STagVal tagVal = {
.cid = taosArrayGetSize(pDataBlock->pDataBlock) + 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.i64 = (int64_t)pDataBlock->info.groupId,
};
taosArrayPush(tagArray, &tagVal);
createTbReq.ctb.tagNum = taosArrayGetSize(tagArray);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (pTag == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosArrayDestroy(tagArray);
tdDestroySVCreateTbReq(&createTbReq);
return;
}
createTbReq.ctb.pTag = (uint8_t*)pTag;
// set tag name
SArray* tagName = taosArrayInit(1, TSDB_COL_NAME_LEN);
char tagNameStr[TSDB_COL_NAME_LEN] = {0};
strcpy(tagNameStr, "group_id");
taosArrayPush(tagName, tagNameStr);
createTbReq.ctb.tagName = tagName;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return;
}
// set schema str
schemaStr = taosMemoryMalloc(schemaLen);
if (schemaStr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, schemaStr, schemaLen);
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
tEncoderClear(&encoder);
taosMemoryFree(schemaStr);
return;
}
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
} else {
if (mr.me.type != TSDB_CHILD_TABLE) {
tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName,
mr.me.type);
metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
}
if (mr.me.ctbEntry.suid != suid) {
tqError("vgId:%d, failed to write into %s, since suid mismatch, expect suid: %ld, actual suid %ld",
TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry);
metaReaderClear(&mr);
taosMemoryFree(ctbName);
continue;
}
createTb = false;
uid = mr.me.uid;
metaReaderClear(&mr);
tqDebug("vgId:%d, stream write, table %s, uid %ld already exist, skip create", TD_VID(pVnode), ctbName, uid);
taosMemoryFreeClear(ctbName);
}
int32_t cap = sizeof(SSubmitReq);
int32_t rows = pDataBlock->info.rows;
int32_t maxLen = TD_ROW_MAX_BYTES_FROM_SCHEMA(pTSchema);
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
SSubmitReq* ret = rpcMallocCont(cap);
ret->header.vgId = pVnode->config.vgId;
ret->length = sizeof(SSubmitReq);
ret->numOfBlocks = htonl(1);
SSubmitBlk* blkHead = POINTER_SHIFT(ret, sizeof(SSubmitReq));
blkHead->numOfRows = htonl(pDataBlock->info.rows);
blkHead->sversion = htonl(pTSchema->version);
blkHead->suid = htobe64(suid);
// uid is assigned by vnode
blkHead->uid = 0;
blkHead->schemaLen = 0;
tqDebug("tq sink, convert block %d, rows: %d", i, rows);
int32_t dataLen = 0;
void* blkSchema = POINTER_SHIFT(blkHead, sizeof(SSubmitBlk));
STSRow* rowData = blkSchema;
if (createTb) {
memcpy(blkSchema, schemaStr, schemaLen);
blkHead->schemaLen = htonl(schemaLen);
rowData = POINTER_SHIFT(blkSchema, schemaLen);
} else {
blkHead->uid = htobe64(uid);
}
taosMemoryFreeClear(schemaStr);
for (int32_t j = 0; j < rows; j++) {
SRowBuilder rb = {0};
tdSRowInit(&rb, pTSchema->version);
tdSRowSetTpInfo(&rb, pTSchema->numOfCols, pTSchema->flen);
tdSRowResetBuf(&rb, rowData);
for (int32_t k = 0; k < pTSchema->numOfCols; k++) {
const STColumn* pColumn = &pTSchema->columns[k];
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, k);
if (colDataIsNull_s(pColData, j)) {
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, pColumn->offset, k);
} else {
void* colData = colDataGetData(pColData, j);
tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, colData, true, pColumn->offset, k);
}
}
tdSRowEnd(&rb);
int32_t rowLen = TD_ROW_LEN(rowData);
rowData = POINTER_SHIFT(rowData, rowLen);
dataLen += rowLen;
}
blkHead->dataLen = htonl(dataLen);
ret->length += sizeof(SSubmitBlk) + schemaLen + dataLen;
ret->length = htonl(ret->length);
SRpcMsg msg = {
.msgType = TDMT_VND_SUBMIT,
.pCont = ret,
.contLen = ntohl(ret->length),
};
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
rpcFreeCont(ret);
tqDebug("failed to put into write-queue since %s", terrstr());
}
}
}
taosArrayDestroy(tagArray);
}
void tqTableSink(SStreamTask* pTask, void* vnode, int64_t ver, void* data) {
const SArray* pRes = (const SArray*)data;
SVnode* pVnode = (SVnode*)vnode;

View File

@ -151,9 +151,6 @@ static SBlockData *loadLastBlock(SLDataIter *pIter, const char *idStr) {
", last file index:%d, last block index:%d, entry:%d, %p, elapsed time:%.2f ms, %s",
pInfo->loadBlocks, pIter->uid, pIter->iStt, pIter->iSttBlk, pInfo->currentLoadBlockIndex, pBlock, el,
idStr);
if (code != TSDB_CODE_SUCCESS) {
goto _exit;
}
pInfo->blockIndex[pInfo->currentLoadBlockIndex] = pIter->iSttBlk;
tsdbDebug("last block index list:%d, %d, %s", pInfo->blockIndex[0], pInfo->blockIndex[1], idStr);
@ -466,8 +463,8 @@ static void findNextValidRow(SLDataIter *pIter, const char *idStr) {
}
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
int32_t code = 0;
int32_t step = pIter->backward ? -1 : 1;
terrno = TSDB_CODE_SUCCESS;
// no qualified last file block in current file, no need to fetch row
if (pIter->pSttBlk == NULL) {
@ -476,6 +473,10 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
int32_t iBlockL = pIter->iSttBlk;
SBlockData *pBlockData = loadLastBlock(pIter, idStr);
if (pBlockData == NULL && terrno != TSDB_CODE_SUCCESS) {
goto _exit;
}
pIter->iRow += step;
while (1) {
@ -501,11 +502,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
_exit:
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
}
return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
return (terrno == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL);
}
SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; }

View File

@ -340,7 +340,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, SArray* aDFileSet, STsdb
pIter->pLastBlockReader = taosMemoryCalloc(1, sizeof(struct SLastBlockReader));
if (pIter->pLastBlockReader == NULL) {
int32_t code = TSDB_CODE_OUT_OF_MEMORY;
tsdbError("failed to prepare the last block iterator, code:%d %s", tstrerror(code), pReader->idStr);
tsdbError("failed to prepare the last block iterator, code:%s %s", tstrerror(code), pReader->idStr);
return code;
}
}
@ -646,7 +646,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
double el = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug(
"load block of %d tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"load block of %"PRIzu" tables completed, blocks:%d in %d tables, last-files:%d, block-info-size:%.2f Kb, elapsed "
"time:%.2f ms %s",
numOfTables, pBlockNum->numOfBlocks, numOfQTable, pBlockNum->numOfLastFiles, sizeInDisk / 1000.0, el,
pReader->idStr);
@ -1506,7 +1506,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
if (pReader->pMemSchema == NULL) {
int32_t code =
metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
return pReader->pMemSchema;
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
}
}
if (pReader->pMemSchema->version == sversion) {
@ -1515,7 +1520,12 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
taosMemoryFree(pReader->pMemSchema);
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
return pReader->pMemSchema;
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return NULL;
} else {
return pReader->pMemSchema;
}
}
static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* pBlockScanInfo, TSDBROW* pRow,
@ -1811,7 +1821,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
init = true;
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
tRowMergerInit(&merge, &fRow, pReader->pSchema);
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge);
}
@ -1821,8 +1835,12 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1);
} else {
init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
}
@ -1832,7 +1850,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
int32_t code = tRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
@ -1842,7 +1863,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, pRow);
} else {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
@ -1850,7 +1874,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == k.ts) {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, pRow, pSchema);
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
@ -1860,7 +1888,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(piRow), pReader, pBlockScanInfo->uid);
tRowMergerInit(&merge, piRow, pSchema);
int32_t code = tRowMergerInit(&merge, piRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
@ -1871,7 +1902,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
tRowMerge(&merge, &fRow1);
} else {
init = true;
tRowMergerInit(&merge, &fRow1, pReader->pSchema);
int32_t code = tRowMergerInit(&merge, &fRow1, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
}
doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, tsLast, &merge);
}
@ -1879,7 +1913,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == key) {
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
if (!init) {
tRowMergerInit(&merge, &fRow, pReader->pSchema);
int32_t code = tRowMergerInit(&merge, &fRow, pReader->pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
tRowMerge(&merge, &fRow);
}
@ -3041,19 +3078,30 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
SRowMerger merge = {0};
// get the correct schema for data in memory
terrno = 0;
STSchema* pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
if (pTSchema == NULL) {
return terrno;
}
if (pReader->pSchema == NULL) {
pReader->pSchema = pTSchema;
}
tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
int32_t code = tRowMergerInit2(&merge, pReader->pSchema, &current, pTSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
STSchema* pTSchema1 = doGetSchemaForTSRow(TSDBROW_SVERSION(pNextRow), pReader, uid);
if(pTSchema1 == NULL) {
return terrno;
}
tRowMergerAdd(&merge, pNextRow, pTSchema1);
doMergeRowsInBuf(pIter, uid, current.pTSRow->ts, pDelList, &merge, pReader);
int32_t code = tRowMergerGetRow(&merge, pTSRow);
code = tRowMergerGetRow(&merge, pTSRow);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -3085,7 +3133,7 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
doMergeRowsInBuf(&pBlockScanInfo->iter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
tRowMerge(&merge, piRow);
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, k.ts, pBlockScanInfo->delSkyline, &merge, pReader);
doMergeRowsInBuf(&pBlockScanInfo->iiter, pBlockScanInfo->uid, ik.ts, pBlockScanInfo->delSkyline, &merge, pReader);
}
int32_t code = tRowMergerGetRow(&merge, pTSRow);
@ -3443,7 +3491,7 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, SArray* pTabl
return code;
_err:
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), pReader->idStr);
tsdbError("failed to create data reader, code:%s %s", tstrerror(code), idstr);
return code;
}
@ -3732,7 +3780,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
}
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
if (isEmptyQueryTimeWindow(&pReader->window)) {
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
return TSDB_CODE_SUCCESS;
}

View File

@ -719,8 +719,6 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
}
pMerger->version = key.version;
_exit:
return code;
}

View File

@ -272,6 +272,12 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
SRpcMsg rspMsg = {0};
void *pRsp = NULL;
if (msgNum >= MAX_META_MSG_IN_BATCH) {
code = TSDB_CODE_INVALID_MSG;
qError("too many msgs %d in vnode batch meta req", msgNum);
goto _exit;
}
SArray *batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -20,6 +20,8 @@
extern "C" {
#endif
#define CTG_MAX_REQ_IN_BATCH 1048576
#ifdef __cplusplus
}
#endif

View File

@ -173,14 +173,18 @@ int32_t ctgRefreshTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx*
CTG_ERR_JRET(ctgCloneMetaOutput(output, pOutput));
}
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncReq));
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncReq);
output = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS;
_return:
taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output);
if (output) {
taosMemoryFreeClear(output->tbMeta);
taosMemoryFreeClear(output);
}
CTG_RET(code);
}
@ -290,7 +294,9 @@ int32_t ctgUpdateTbMeta(SCatalog* pCtg, STableMetaRsp* rspMsg, bool syncOp) {
CTG_ERR_JRET(queryCreateTableMetaFromMsg(rspMsg, rspMsg->tableType == TSDB_SUPER_TABLE, &output->tbMeta));
}
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, output, syncOp));
code = ctgUpdateTbMetaEnqueue(pCtg, output, syncOp);
output = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS;

View File

@ -71,7 +71,7 @@ int32_t ctgInitGetTbMetasTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbMetaNum);
return TSDB_CODE_SUCCESS;
}
@ -202,7 +202,7 @@ int32_t ctgInitGetTbHashsTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
taosArrayPush(pJob->pTasks, &task);
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, dbNum:%d, tbNum:%d", pJob->queryId, taskIdx,
ctgTaskTypeStr(task.type), taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
ctgTaskTypeStr(task.type), (int32_t)taosArrayGetSize(ctx->pNames), pJob->tbHashNum);
return TSDB_CODE_SUCCESS;
}
@ -1056,7 +1056,6 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
@ -1223,7 +1222,6 @@ int32_t ctgHandleGetTbMetasRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
@ -1309,7 +1307,6 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
_return:
@ -1346,7 +1343,6 @@ int32_t ctgHandleGetTbHashRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
_return:
@ -1382,7 +1378,6 @@ int32_t ctgHandleGetTbHashsRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBu
default:
ctgError("invalid reqType %d", reqType);
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
break;
}
if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) {
@ -1672,7 +1667,7 @@ int32_t ctgLaunchGetTbMetasTask(SCtgTask* pTask) {
int32_t baseResIdx = 0;
for (int32_t i = 0; i < dbNum; ++i) {
STablesReq* pReq = taosArrayGet(pCtx->pNames, i);
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, taosArrayGetSize(pReq->pTables));
ctgDebug("start to check tb metas in db %s, tbNum %d", pReq->dbFName, (int32_t)taosArrayGetSize(pReq->pTables));
CTG_ERR_RET(ctgGetTbMetasFromCache(pCtg, pConn, pCtx, i, &fetchIdx, baseResIdx, pReq->pTables));
baseResIdx += taosArrayGetSize(pReq->pTables);
}
@ -2010,10 +2005,6 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask* pTask) {
_return:
if (dbCache) {
ctgReleaseVgInfoToCache(pCtg, dbCache);
}
CTG_RET(code);
}

View File

@ -597,6 +597,8 @@ int32_t ctgEnqueue(SCatalog *pCtg, SCtgCacheOperation *operation) {
SCtgQNode *node = taosMemoryCalloc(1, sizeof(SCtgQNode));
if (NULL == node) {
qError("calloc %d failed", (int32_t)sizeof(SCtgQNode));
taosMemoryFree(operation->data);
taosMemoryFree(operation);
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -648,6 +650,7 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
SCtgDropDBMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDBMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDBMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -668,7 +671,6 @@ int32_t ctgDropDbCacheEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId)
_return:
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -681,6 +683,7 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
SCtgDropDbVgroupMsg *msg = taosMemoryMalloc(sizeof(SCtgDropDbVgroupMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropDbVgroupMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -700,7 +703,6 @@ int32_t ctgDropDbVgroupEnqueue(SCatalog *pCtg, const char *dbFName, bool syncOp)
_return:
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -714,6 +716,7 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
SCtgDropStbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropStbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropStbMetaMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -731,7 +734,6 @@ int32_t ctgDropStbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
_return:
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -744,6 +746,7 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
SCtgDropTblMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTblMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTblMetaMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -760,7 +763,6 @@ int32_t ctgDropTbMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId,
_return:
taosMemoryFreeClear(op->data);
CTG_RET(code);
}
@ -773,6 +775,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
SCtgUpdateVgMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateVgMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
taosMemoryFree(op);
ctgFreeVgInfo(dbInfo);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -796,8 +799,6 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
_return:
ctgFreeVgInfo(dbInfo);
taosMemoryFreeClear(op->data);
taosMemoryFreeClear(op);
CTG_RET(code);
}
@ -810,6 +811,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog *pCtg, STableMetaOutput *output, bool sy
SCtgUpdateTbMetaMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbMetaMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -834,8 +836,6 @@ _return:
taosMemoryFree(output);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -847,6 +847,7 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
SCtgUpdateEpsetMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateEpsetMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateEpsetMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -863,8 +864,6 @@ int32_t ctgUpdateVgEpsetEnqueue(SCatalog *pCtg, char *dbFName, int32_t vgId, SEp
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -877,6 +876,7 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
SCtgUpdateUserMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateUserMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateUserMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -892,7 +892,6 @@ int32_t ctgUpdateUserEnqueue(SCatalog *pCtg, SGetUserAuthRsp *pAuth, bool syncOp
_return:
tFreeSGetUserAuthRsp(pAuth);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -906,6 +905,7 @@ int32_t ctgUpdateTbIndexEnqueue(SCatalog *pCtg, STableIndex **pIndex, bool syncO
SCtgUpdateTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbIndexMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -923,7 +923,6 @@ _return:
taosArrayDestroyEx((*pIndex)->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(*pIndex);
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -937,6 +936,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
SCtgDropTbIndexMsg *msg = taosMemoryMalloc(sizeof(SCtgDropTbIndexMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbIndexMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -952,8 +952,6 @@ int32_t ctgDropTbIndexEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -968,6 +966,7 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
SCtgClearCacheMsg *msg = taosMemoryMalloc(sizeof(SCtgClearCacheMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgClearCacheMsg));
taosMemoryFree(op);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -981,8 +980,6 @@ int32_t ctgClearCacheEnqueue(SCatalog *pCtg, bool freeCtg, bool stopQueue, bool
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
@ -1483,7 +1480,9 @@ int32_t ctgUpdateTbMetaToCache(SCatalog *pCtg, STableMetaOutput *pOut, bool sync
int32_t code = 0;
CTG_ERR_RET(ctgCloneMetaOutput(pOut, &pOutput));
CTG_ERR_JRET(ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq));
code = ctgUpdateTbMetaEnqueue(pCtg, pOutput, syncReq);
pOutput = NULL;
CTG_ERR_JRET(code);
return TSDB_CODE_SUCCESS;

View File

@ -19,6 +19,7 @@
#include "tname.h"
#include "tref.h"
#include "trpc.h"
#include "ctgRemote.h"
int32_t ctgHandleBatchRsp(SCtgJob* pJob, SCtgTaskCallbackParam* cbParam, SDataBuf* pMsg, int32_t rspCode) {
int32_t code = 0;
@ -579,6 +580,11 @@ int32_t ctgBuildBatchReqMsg(SCtgBatch* pBatch, int32_t vgId, void** msg) {
int32_t num = taosArrayGetSize(pBatch->pMsgs);
SBatchReq* pBatchReq = (SBatchReq*)(*msg);
if (num >= CTG_MAX_REQ_IN_BATCH) {
qError("too many msgs %d in one batch request", num);
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
pBatchReq->header.vgId = htonl(vgId);
pBatchReq->msgNum = htonl(num);
offset += sizeof(SBatchReq);

View File

@ -89,8 +89,8 @@ extern "C" {
#define EXPLAIN_STRING_TYPE_FORMAT "%s"
#define EXPLAIN_INPUT_ORDER_FORMAT "input_order=%s"
#define EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT "output_order=%s"
#define EXPLAIN_OFFSET_FORMAT "offset=%d"
#define EXPLAIN_SOFFSET_FORMAT "soffset=%d"
#define EXPLAIN_OFFSET_FORMAT "offset=%" PRId64
#define EXPLAIN_SOFFSET_FORMAT "soffset=%" PRId64
#define EXPLAIN_PARTITIONS_FORMAT "partitions=%d"
#define COMMAND_RESET_LOG "resetLog"

View File

@ -485,6 +485,7 @@ static int32_t execShowCreateTable(SShowCreateTableStmt* pStmt, SRetrieveTableRs
SSDataBlock* pBlock = buildCreateTbResultDataBlock();
int32_t code = setCreateTBResultIntoDataBlock(pBlock, pStmt->pDbCfg, pStmt->tableName, pStmt->pTableCfg);
if (code) {
blockDataDestroy(pBlock);
return code;
}
return buildRetrieveTableRsp(pBlock, SHOW_CREATE_TB_RESULT_COLS, pRsp);

View File

@ -1598,6 +1598,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, rspSize);
if (NULL == rsp) {
qError("malloc SRetrieveTableRsp failed, size:%d", rspSize);
blockDataDestroy(pBlock);
QRY_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
}

View File

@ -267,6 +267,8 @@ int32_t createDataDeleter(SDataSinkManager* pManager, const SDataSinkNode* pData
taosThreadMutexInit(&deleter->mutex, NULL);
if (NULL == deleter->pDataBlocks) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
destroyDataSinker((SDataSinkHandle*)deleter);
taosMemoryFree(deleter);
return TSDB_CODE_QRY_OUT_OF_MEMORY;
}
*pHandle = deleter;

View File

@ -323,6 +323,7 @@ int32_t createDataInserter(SDataSinkManager* pManager, const SDataSinkNode* pDat
int32_t code =
tsdbGetTableSchema(inserter->pParam->readHandle->vnode, pInserterNode->tableId, &inserter->pSchema, &suid);
if (code) {
destroyDataSinker((SDataSinkHandle*)pInserterNode);
return code;
}

View File

@ -1138,7 +1138,7 @@ static SResSchema createResSchema(int32_t type, int32_t bytes, int32_t slotId, i
s.bytes = bytes;
s.slotId = slotId;
s.precision = precision;
strncpy(s.name, name, tListLen(s.name));
tstrncpy(s.name, name, tListLen(s.name));
return s;
}
@ -1366,7 +1366,7 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput,
fmGetFuncExecFuncs(pCtx->functionId, &pCtx->fpSet);
} else {
char* udfName = pExpr->pExpr->_function.pFunctNode->functionName;
strncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN);
tstrncpy(pCtx->udfName, udfName, TSDB_FUNC_NAME_LEN);
fmGetUdafExecFuncs(pCtx->functionId, &pCtx->fpSet);
}
pCtx->fpSet.getEnv(pExpr->pExpr->_function.pFunctNode, &env);

View File

@ -294,7 +294,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo
if (!exists) {
#endif
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &keyInfo);
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
taosHashPut(pTaskInfo->tableqinfoList.map, uid, sizeof(*uid), &keyInfo.groupId, sizeof(keyInfo.groupId));
}
/*}*/
@ -616,7 +616,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
int32_t nOptrWithVal = 0;
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
taosMemoryFreeClear(*pOutput);
*len = 0;
}
@ -701,10 +701,10 @@ int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
return 0;
}
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo mtInfo) {
int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* sContext, SMetaTableInfo* pMtInfo) {
memset(pCond, 0, sizeof(SQueryTableDataCond));
pCond->order = TSDB_ORDER_ASC;
pCond->numOfCols = mtInfo.schema->nCols;
pCond->numOfCols = pMtInfo->schema->nCols;
pCond->colList = taosMemoryCalloc(pCond->numOfCols, sizeof(SColumnInfo));
if (pCond->colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
@ -712,15 +712,15 @@ int32_t initQueryTableDataCondForTmq(SQueryTableDataCond* pCond, SSnapContext* s
}
pCond->twindows = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
pCond->suid = mtInfo.suid;
pCond->suid = pMtInfo->suid;
pCond->type = TIMEWINDOW_RANGE_CONTAINED;
pCond->startVersion = -1;
pCond->endVersion = sContext->snapVersion;
for (int32_t i = 0; i < pCond->numOfCols; ++i) {
pCond->colList[i].type = mtInfo.schema->pSchema[i].type;
pCond->colList[i].bytes = mtInfo.schema->pSchema[i].bytes;
pCond->colList[i].colId = mtInfo.schema->pSchema[i].colId;
pCond->colList[i].type = pMtInfo->schema->pSchema[i].type;
pCond->colList[i].bytes = pMtInfo->schema->pSchema[i].bytes;
pCond->colList[i].colId = pMtInfo->schema->pSchema[i].colId;
}
return TSDB_CODE_SUCCESS;
@ -844,7 +844,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
taosArrayDestroy(pTaskInfo->tableqinfoList.pTableList);
if (mtInfo.uid == 0) return 0; // no data
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, mtInfo);
initQueryTableDataCondForTmq(&pTaskInfo->streamInfo.tableCond, sContext, &mtInfo);
pTaskInfo->streamInfo.tableCond.twindows.skey = pOffset->ts;
pTaskInfo->tableqinfoList.pTableList = taosArrayInit(1, sizeof(STableKeyInfo));
taosArrayPush(pTaskInfo->tableqinfoList.pTableList, &(STableKeyInfo){.uid = mtInfo.uid, .groupId = 0});

View File

@ -895,33 +895,6 @@ static bool overlapWithTimeWindow(STaskAttr* pQueryAttr, SDataBlockInfo* pBlockI
}
#endif
static uint32_t doFilterByBlockTimeWindow(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
#if 0
SqlFunctionCtx* pCtx = pTableScanInfo->pCtx;
uint32_t status = BLK_DATA_NOT_LOAD;
int32_t numOfOutput = 0; // pTableScanInfo->numOfOutput;
for (int32_t i = 0; i < numOfOutput; ++i) {
int32_t functionId = pCtx[i].functionId;
int32_t colId = pTableScanInfo->pExpr[i].base.pParam[0].pCol->colId;
// group by + first/last should not apply the first/last block filter
if (functionId < 0) {
status |= BLK_DATA_DATA_LOAD;
return status;
} else {
// status |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
// if ((status & BLK_DATA_DATA_LOAD) == BLK_DATA_DATA_LOAD) {
// return status;
// }
}
}
return status;
#endif
return 0;
}
int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NOT_LOAD;
@ -1802,7 +1775,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
} else {
taosMemoryFree(pMsg->pData);
pSourceDataInfo->code = code;
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
qDebug("%s fetch rsp received, index:%d, code:%s", pSourceDataInfo->taskId, index, tstrerror(code));
}
pSourceDataInfo->status = EX_SOURCE_DATA_READY;
@ -2816,6 +2789,8 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
blockDataCleanup(pInfo->pRes);
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
doApplyScalarCalculation(pOperator, pBlock, order, scanFlag);
if (pInfo->curGroupId == 0 || pInfo->curGroupId == pInfo->pRes->info.groupId) {
@ -3397,8 +3372,8 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
pSchema->colId = pColNode->colId;
pSchema->type = pColNode->node.resType.type;
pSchema->type = pColNode->node.resType.bytes;
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
pSchema->bytes = pColNode->node.resType.bytes;
tstrncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
}
// this the tags and pseudo function columns, we only keep the tag columns
@ -3412,7 +3387,7 @@ SSchemaWrapper* extractQueriedColumnSchema(SScanPhysiNode* pScanNode) {
SSchema* pSchema = &pqSw->pSchema[pqSw->nCols++];
pSchema->colId = pColNode->colId;
pSchema->type = pColNode->node.resType.type;
pSchema->type = pColNode->node.resType.bytes;
pSchema->bytes = pColNode->node.resType.bytes;
strncpy(pSchema->name, pColNode->colName, tListLen(pSchema->name));
}
}

View File

@ -284,6 +284,19 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
return true;
}
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
}
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -312,8 +325,9 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
pCost->skipBlocks += 1;
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
pCost->skipBlocks += 1;
return TSDB_CODE_SUCCESS;
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
pCost->loadBlockStatis += 1;
@ -322,6 +336,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
return TSDB_CODE_SUCCESS;
} else {
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
@ -371,17 +386,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
}
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
// currently only the tbname pseudo column
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
GET_TASKID(pTaskInfo));
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
}
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
if (pTableScanInfo->pFilterNode != NULL) {
int64_t st = taosGetTimestampUs();
@ -937,8 +942,7 @@ static FORCE_INLINE void doClearBufferedBlocks(SStreamScanInfo* pInfo) {
}
static bool isSessionWindow(SStreamScanInfo* pInfo) {
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
return pInfo->windowSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
}
static bool isStateWindow(SStreamScanInfo* pInfo) {
@ -1079,12 +1083,13 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_
static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t* gpIdCol, SInterval* pInterval,
SDataBlockInfo* pDataBlockInfo, int32_t* pRowIndex, bool hasGroup) {
SResultRowInfo dumyInfo;
SResultRowInfo dumyInfo = {0};
dumyInfo.cur.pageId = -1;
STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, startTsCol[*pRowIndex], pInterval, TSDB_ORDER_ASC);
STimeWindow endWin = win;
STimeWindow preWin = win;
uint64_t groupId = gpIdCol[*pRowIndex];
while (1) {
if (hasGroup) {
(*pRowIndex) += 1;
@ -1148,6 +1153,9 @@ static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32
pResult->info.rows++;
}
}
blockDataDestroy(tmpBlock);
if (pResult->info.rows > 0) {
pResult->info.calWin = pInfo->updateWin;
return pResult;
@ -1316,7 +1324,7 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
blockDataEnsureCapacity(pResBlock, 1);
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL);
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
ASSERT(pResBlock->info.rows == 1);
// build tagArray
@ -1543,7 +1551,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %d", pTaskInfo->streamInfo.snapshotVer + 1);
qDebug("queue scan tsdb over, switch to wal ver %"PRId64, pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
return NULL;
}
@ -2120,9 +2128,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
taosMemoryFree(pStreamScan->pPseudoExpr);
}
cleanupExprSupp(&pStreamScan->tbnameCalSup);
cleanupExprSupp(&pStreamScan->tagCalSup);
updateInfoDestroy(pStreamScan->pUpdateInfo);
blockDataDestroy(pStreamScan->pRes);
blockDataDestroy(pStreamScan->pUpdateRes);
@ -2995,7 +3000,7 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
while (1) {
int64_t startTs = taosGetTimestampUs();
strncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
tstrncpy(pInfo->req.tb, tNameGetTableName(&pInfo->name), tListLen(pInfo->req.tb));
strcpy(pInfo->req.user, pInfo->pUser);
int32_t contLen = tSerializeSRetrieveTableReq(NULL, 0, &pInfo->req);

View File

@ -2842,14 +2842,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
int32_t numOfCols = 0;
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
initBasicInfo(&pInfo->binfo, pResBlock);
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
initBasicInfo(&pInfo->binfo, pResBlock);
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
pInfo->gap = pSessionNode->gap;
@ -4666,7 +4665,6 @@ void destroyStreamStateOperatorInfo(void* param) {
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
destroyStreamSessionAggOperatorInfo(pChInfo);
taosMemoryFreeClear(pChild);
taosMemoryFreeClear(pChInfo);
}
}
colDataDestroy(&pInfo->twAggSup.timeWindowData);

View File

@ -16,9 +16,10 @@
#include "tsimplehash.h"
#include "taoserror.h"
#include "tlog.h"
#include "tdef.h"
#define SHASH_DEFAULT_LOAD_FACTOR 0.75
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_MAX_CAPACITY (1024 * 1024 * 16L)
#define SHASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * SHASH_DEFAULT_LOAD_FACTOR)
#define GET_SHASH_NODE_KEY(_n, _dl) ((char *)(_n) + sizeof(SHNode) + (_dl))
@ -104,20 +105,20 @@ static void tSimpleHashTableResize(SSHashObj *pHashObj) {
int32_t newCapacity = (int32_t)(pHashObj->capacity << 1u);
if (newCapacity > HASH_MAX_CAPACITY) {
uDebug("current capacity:%zu, maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
uDebug("current capacity:%"PRIzu", maximum capacity:%" PRIu64 ", no resize applied due to limitation is reached",
pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, sizeof(void *) * newCapacity);
void *pNewEntryList = taosMemoryRealloc(pHashObj->hashList, POINTER_BYTES * newCapacity);
if (!pNewEntryList) {
uWarn("hash resize failed due to out of memory, capacity remain:%zu", pHashObj->capacity);
return;
}
size_t inc = newCapacity - pHashObj->capacity;
memset((char *)pNewEntryList + pHashObj->capacity * sizeof(void *), 0, inc * sizeof(void *));
memset((char *)pNewEntryList + pHashObj->capacity * POINTER_BYTES, 0, inc * sizeof(void *));
pHashObj->hashList = pNewEntryList;
pHashObj->capacity = newCapacity;

View File

@ -2357,7 +2357,8 @@ static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNo
}
static int32_t setTableCacheLastMode(STranslateContext* pCxt, SSelectStmt* pSelect) {
if ((!pSelect->hasLastRowFunc && !pSelect->hasLastFunc) || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable)) {
if ((!pSelect->hasLastRowFunc && !pSelect->hasLastFunc) || QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) ||
TSDB_SYSTEM_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType) {
return TSDB_CODE_SUCCESS;
}

View File

@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
// *((bool*)pContext) =
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
*((bool*)pContext) = true;
*((bool*)pContext) =
(COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
}
return DEAL_RES_CONTINUE;

View File

@ -424,6 +424,12 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
return TSDB_CODE_SUCCESS;
}
if ((pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) > TSDB_MAX_COL_TAG_NUM) {
*pDst = NULL;
qError("too many column and tag num:%d,%d", pSrc->tableInfo.numOfColumns, pSrc->tableInfo.numOfTags);
return TSDB_CODE_INVALID_PARA;
}
int32_t metaSize = sizeof(STableMeta) + (pSrc->tableInfo.numOfColumns + pSrc->tableInfo.numOfTags) * sizeof(SSchema);
*pDst = taosMemoryMalloc(metaSize);
if (NULL == *pDst) {

View File

@ -271,8 +271,8 @@ int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t
SBuildTableInput *pInput = input;
STableCfgReq cfgReq = {0};
cfgReq.header.vgId = pInput->vgId;
strcpy(cfgReq.dbFName, pInput->dbFName);
strcpy(cfgReq.tbName, pInput->tbName);
strncpy(cfgReq.dbFName, pInput->dbFName, sizeof(cfgReq.dbFName));
strncpy(cfgReq.tbName, pInput->tbName, sizeof(cfgReq.tbName));
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
void *pBuf = (*mallcFp)(bufLen);
@ -615,6 +615,8 @@ int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
tFreeSTableCfgRsp(out);
taosMemoryFree(out);
return TSDB_CODE_INVALID_MSG;
}

View File

@ -258,7 +258,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
dsGetDataLength(ctx->sinkHandle, &len, &queryEnd);
if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%d", len);
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
@ -292,7 +292,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
// Got data from sink
QW_TASK_DLOG("there are data in sink, dataLength:%d", len);
QW_TASK_DLOG("there are data in sink, dataLength:%" PRId64, len);
*dataLen += len;
@ -408,7 +408,6 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu
// QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code));
QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED);
break;
}
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC));

View File

@ -1214,7 +1214,7 @@ int32_t fltAddGroupUnitFromNode(SFilterInfo *info, SNode *tree, SArray *group) {
int32_t filterAddUnitFromUnit(SFilterInfo *dst, SFilterInfo *src, SFilterUnit *u, uint32_t *uidx) {
SFilterFieldId left, right, *pright = &right;
int32_t type = FILTER_UNIT_DATA_TYPE(u);
uint8_t type = FILTER_UNIT_DATA_TYPE(u);
uint16_t flag = 0;
filterAddField(dst, FILTER_UNIT_COL_DESC(src, u), NULL, FLD_TYPE_COLUMN, &left, 0, false);
@ -1644,7 +1644,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
SFilterField *left = FILTER_UNIT_LEFT_FIELD(info, unit);
SColumnNode *refNode = (SColumnNode *)left->desc;
if (unit->compare.optr >= 0 && unit->compare.optr <= OP_TYPE_JSON_CONTAINS) {
if (unit->compare.optr <= OP_TYPE_JSON_CONTAINS) {
len = sprintf(str, "UNIT[%d] => [%d][%d] %s [", i, refNode->dataBlockId, refNode->slotId,
operatorTypeStr(unit->compare.optr));
}
@ -1664,7 +1664,7 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
if (unit->compare.optr2) {
strcat(str, " && ");
if (unit->compare.optr2 >= 0 && unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS) {
if (unit->compare.optr2 <= OP_TYPE_JSON_CONTAINS) {
sprintf(str + strlen(str), "[%d][%d] %s [", refNode->dataBlockId, refNode->slotId,
operatorTypeStr(unit->compare.optr2));
}
@ -1709,9 +1709,9 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
ctx->isrange);
if (ctx->isrange) {
SFilterRangeNode *r = ctx->rs;
int32_t tlen = 0;
while (r) {
char str[256] = {0};
int32_t tlen = 0;
char str[256] = {0};
if (FILTER_GET_FLAG(r->ra.sflag, RANGE_FLG_NULL)) {
strcat(str, "(NULL)");
} else {
@ -2614,7 +2614,7 @@ int32_t filterRewrite(SFilterInfo *info, SFilterGroupCtx **gRes, int32_t gResNum
int32_t usize = (int32_t)taosArrayGetSize((SArray *)colInfo->info);
for (int32_t n = 0; n < usize; ++n) {
SFilterUnit *u = taosArrayGetP((SArray *)colInfo->info, n);
SFilterUnit *u = (SFilterUnit *)taosArrayGetP((SArray *)colInfo->info, n);
filterAddUnitFromUnit(info, &oinfo, u, &uidx);
filterAddUnitToGroup(&ng, uidx);

View File

@ -122,7 +122,6 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
break;
case JOB_TASK_STATUS_DROP:
SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED);
break;
default:
SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus));

View File

@ -169,7 +169,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
if (TSDB_CODE_SUCCESS == code && batchRsp.nRsps > 0) {
SCH_LOCK(SCH_WRITE, &pJob->resLock);
if (NULL == pJob->execRes.res) {
pJob->execRes.res = taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
}
@ -386,7 +386,6 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
// NEVER REACH HERE
SCH_TASK_ELOG("invalid status to handle drop task rsp, refId:0x%" PRIx64, pJob->refId);
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
case TDMT_SCH_LINK_BROKEN:
SCH_TASK_ELOG("link broken received, error:%x - %s", rspCode, tstrerror(rspCode));
@ -981,7 +980,7 @@ _return:
}
int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t msgType) {
uint32_t msgSize = 0;
int32_t msgSize = 0;
void *msg = NULL;
int32_t code = 0;
bool isCandidateAddr = false;
@ -1133,12 +1132,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
default:
SCH_TASK_ELOG("unknown msg type to send, msgType:%d", msgType);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
break;
}
#if 1
SSchTrans trans = {.pTrans = pJob->conn.pTrans, .pHandle = SCH_GET_TASK_HANDLE(pTask)};
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
code = schAsyncSendMsg(pJob, pTask, &trans, addr, msgType, msg, (uint32_t)msgSize, persistHandle, (rpcCtx.args ? &rpcCtx : NULL));
msg = NULL;
SCH_ERR_JRET(code);

View File

@ -904,7 +904,7 @@ int32_t schLaunchRemoteTask(SSchJob *pJob, SSchTask *pTask) {
} else if (tsQueryPlannerTrace) {
char *msg = NULL;
int32_t msgLen = 0;
qSubPlanToString(plan, &msg, &msgLen);
SCH_ERR_RET(qSubPlanToString(plan, &msg, &msgLen));
SCH_TASK_DLOGL("physical plan len:%d, %s", msgLen, msg);
taosMemoryFree(msg);
}
@ -926,6 +926,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
}
SArray *explainRes = NULL;
int32_t code = 0;
SQWMsg qwMsg = {0};
qwMsg.msgInfo.taskType = TASK_TYPE_TEMP;
qwMsg.msgInfo.explain = SCH_IS_EXPLAIN_JOB(pJob);
@ -938,14 +939,21 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
explainRes = NULL;
}
SCH_RET(schProcessOnTaskSuccess(pJob, pTask));
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
_return:
taosArrayDestroy(explainRes);
SCH_RET(code);
}
int32_t schLaunchTaskImpl(void *param) {
@ -1097,22 +1105,28 @@ int32_t schExecRemoteFetch(SSchJob *pJob, SSchTask *pTask) {
int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
void *pRsp = NULL;
int32_t code = 0;
SArray *explainRes = NULL;
if (SCH_IS_EXPLAIN_JOB(pJob)) {
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
}
SCH_ERR_RET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
SCH_ERR_RET(schHandleExplainRes(explainRes));
explainRes = NULL;
}
SCH_ERR_RET(schProcessFetchRsp(pJob, pTask, pRsp, TSDB_CODE_SUCCESS));
return TSDB_CODE_SUCCESS;
_return:
taosArrayDestroy(explainRes);
SCH_RET(code);
}
// Note: no more error processing, handled in function internal

View File

@ -242,7 +242,7 @@ uint64_t schGenUUID(void) {
if (hashId == 0) {
char uid[64] = {0};
int32_t code = taosGetSystemUUID(uid, tListLen(uid));
int32_t code = taosGetSystemUUID(uid, tListLen(uid) - 1);
if (code != TSDB_CODE_SUCCESS) {
qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno)));
} else {

View File

@ -1006,6 +1006,7 @@ static int tdbBtreeEncodePayload(SPage *pPage, SCell *pCell, int nHeader, const
nLeft -= kLen;
// pack partial val to local if any space left
if (nLocal > nHeader + kLen + sizeof(SPgno)) {
ASSERT(pVal != NULL && vLen != 0);
memcpy(pCell + nHeader + kLen, pVal, nLocal - nHeader - kLen - sizeof(SPgno));
nLeft -= nLocal - nHeader - kLen - sizeof(SPgno);
}

View File

@ -105,6 +105,7 @@ static int tdbPCacheAlterImpl(SPCache *pCache, int32_t nPage) {
for (int32_t iPage = pCache->nPages; iPage < nPage; iPage++) {
if (tdbPageCreate(pCache->szPage, &aPage[iPage], tdbDefaultMalloc, NULL) < 0) {
// TODO: handle error
tdbOsFree(pCache->aPage);
return -1;
}
@ -267,7 +268,7 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
// 4. Try a create new page
if (!pPage) {
ret = tdbPageCreate(pCache->szPage, &pPage, pTxn->xMalloc, pTxn->xArg);
if (ret < 0) {
if (ret < 0 && pPage != NULL) {
// TODO
ASSERT(0);
return NULL;
@ -300,8 +301,8 @@ static SPage *tdbPCacheFetchImpl(SPCache *pCache, const SPgid *pPgid, TXN *pTxn)
pPage->pPager = pPageH->pPager;
memcpy(pPage->pData, pPageH->pData, pPage->pageSize);
tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
TDB_PAGE_PGNO(pPageH));
// tdbDebug("pcache/pPageH: %p %d %p %p %d", pPageH, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize, pPage,
// TDB_PAGE_PGNO(pPageH));
tdbPageInit(pPage, pPageH->pPageHdr - pPageH->pData, pPageH->xCellSize);
pPage->kLen = pPageH->kLen;
pPage->vLen = pPageH->vLen;

View File

@ -84,7 +84,8 @@ int tdbPagerOpen(SPCache *pCache, const char *fileName, SPager **ppPager) {
pPager->pCache = pCache;
pPager->fd = tdbOsOpen(pPager->dbFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->fd < 0) {
if (TDB_FD_INVALID(pPager->fd)) {
// if (pPager->fd < 0) {
return -1;
}
@ -226,7 +227,7 @@ int tdbPagerBegin(SPager *pPager, TXN *pTxn) {
// Open the journal
pPager->jfd = tdbOsOpen(pPager->jFileName, TDB_O_CREAT | TDB_O_RDWR, 0755);
if (pPager->jfd < 0) {
if (TDB_FD_INVALID(pPager->jfd)) {
tdbError("failed to open file due to %s. jFileName:%s", strerror(errno), pPager->jFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
@ -365,7 +366,7 @@ int tdbPagerAbort(SPager *pPager, TXN *pTxn) {
// 4, remove the journal file
tdbOsClose(pPager->jfd);
tdbOsRemove(pPager->jFileName);
(void)tdbOsRemove(pPager->jFileName);
pPager->inTran = 0;
return 0;
@ -540,7 +541,8 @@ static int tdbPagerWritePageToJournal(SPager *pPager, SPage *pPage) {
ret = tdbOsWrite(pPager->jfd, pPage->pData, pPage->pageSize);
if (ret < 0) {
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName, pPage->pageSize);
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->jFileName,
pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -568,7 +570,8 @@ static int tdbPagerWritePageToDB(SPager *pPager, SPage *pPage) {
ret = tdbOsWrite(pPager->fd, pPage->pData, pPage->pageSize);
if (ret < 0) {
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName, pPage->pageSize);
tdbError("failed to write page data due to %s. file:%s, pageSize:%ld", strerror(errno), pPager->dbFileName,
pPage->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
@ -603,11 +606,13 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
int ret = tdbOsRead(jfd, &pgno, sizeof(pgno));
if (ret < 0) {
tdbOsFree(pageBuf);
return -1;
}
ret = tdbOsRead(jfd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbOsFree(pageBuf);
return -1;
}
@ -615,13 +620,16 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
if (tdbOsLSeek(pPager->fd, offset, SEEK_SET) < 0) {
tdbError("failed to lseek fd due to %s. file:%s, offset:%ld", strerror(errno), pPager->dbFileName, offset);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
ret = tdbOsWrite(pPager->fd, pageBuf, pPager->pageSize);
if (ret < 0) {
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName, pPager->pageSize);
tdbError("failed to write buf due to %s. file: %s, bufsize:%d", strerror(errno), pPager->dbFileName,
pPager->pageSize);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}
}
@ -629,6 +637,7 @@ int tdbPagerRestore(SPager *pPager, SBTree *pBt) {
if (tdbOsFSync(pPager->fd) < 0) {
tdbError("failed to fsync fd due to %s. dbfile:%s", strerror(errno), pPager->dbFileName);
terrno = TAOS_SYSTEM_ERROR(errno);
tdbOsFree(pageBuf);
return -1;
}

View File

@ -106,11 +106,13 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
// pTb->pBt
ret = tdbBtreeOpen(keyLen, valLen, pPager, tbname, pgno, keyCmprFn, &(pTb->pBt));
if (ret < 0) {
tdbOsFree(pTb);
return -1;
}
ret = tdbPagerRestore(pPager, pTb->pBt);
if (ret < 0) {
tdbOsFree(pTb);
return -1;
}

View File

@ -37,6 +37,8 @@ extern "C" {
/* file */
typedef TdFilePtr tdb_fd_t;
#define TDB_FD_INVALID(fd) (fd == NULL)
#define TDB_O_CREAT TD_FILE_CREATE
#define TDB_O_WRITE TD_FILE_WRITE
#define TDB_O_READ TD_FILE_READ

View File

@ -128,10 +128,10 @@ int transDumpFromBuffer(SConnBuffer* connBuf, char** buf) {
static const int HEADSIZE = sizeof(STransMsgHead);
SConnBuffer* p = connBuf;
if (p->left != 0) {
if (p->left != 0 || p->total <= 0) {
return -1;
}
int total = connBuf->total;
int total = p->total;
if (total >= HEADSIZE && !p->invalid) {
*buf = taosMemoryCalloc(1, total);
memcpy(*buf, p->buf, total);

View File

@ -19,12 +19,12 @@
#include "taos.h"
#include "taoserror.h"
#define UNIT_NUM_BITS 64
#define UNIT_ADDR_NUM_BITS 6
#define UNIT_NUM_BITS 64ULL
#define UNIT_ADDR_NUM_BITS 6ULL
static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
uint64_t mask = 1ULL << (index % UNIT_NUM_BITS);
uint64_t old = buf[unitIndex];
buf[unitIndex] |= mask;
return buf[unitIndex] != old;
@ -32,7 +32,7 @@ static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) {
static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) {
uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS;
uint64_t mask = 1 << (index % UNIT_NUM_BITS);
uint64_t mask = 1ULL << (index % UNIT_NUM_BITS);
return buf[unitIndex] & mask;
}

View File

@ -41,7 +41,7 @@ class TDTestCase:
ifcheckdata = 0
ifManualCommit = 1
groupId = 'group.id:cgrp1'
autoCommit = 'enable.auto.commit:false'
autoCommit = 'enable.auto.commit:true'
autoCommitInterval = 'auto.commit.interval.ms:1000'
autoOffset = 'auto.offset.reset:earliest'
@ -87,8 +87,9 @@ class TDTestCase:
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(self.pollDelay,self.paraDict["dbName"],self.showMsg, self.showRow,self.cdbName)
tdLog.info("After waiting for a period of time, drop one stable")
time.sleep(3)
tdLog.info("After waiting for a commit notify, drop one stable")
#time.sleep(3)
tmqCom.getStartCommitNotifyFromTmqsim()
tdSql.execute("drop table %s.%s" %(self.paraDict['dbName'], self.paraDict['stbName']))
tdLog.info("wait result from consumer, then check it")

View File

@ -42,7 +42,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 3,
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
@ -52,7 +52,7 @@ class TDTestCase:
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1, wal_retention_size=-1,wal_retention_period=-1)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
# tdLog.info("create ctb")
@ -87,7 +87,7 @@ class TDTestCase:
'rowsPerTbl': 1000,
'batchNum': 500,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 5,
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}

View File

@ -37,7 +37,7 @@ class TDTestCase:
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
'ctbPrefix': 'ctb',
'ctbNum': 10,
'ctbNum': 100,
'rowsPerTbl': 4000,
'batchNum': 15,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
@ -124,7 +124,8 @@ class TDTestCase:
tdLog.info("async insert data")
pThread = tmqCom.asyncInsertData(paraDict)
time.sleep(5)
tmqCom.getStartConsumeNotifyFromTmqsim();
#time.sleep(5)
tdLog.info("check show consumers")
tdSql.query("show consumers")
# tdLog.info(tdSql.queryResult)

View File

@ -22,7 +22,7 @@
#define TAOS_CONSOLE_PROMPT_HEADER "taos> "
#define TAOS_CONSOLE_PROMPT_CONTINUE " -> "
#define SHELL_HOST "The auth string to use when connecting to the server."
#define SHELL_HOST "TDengine server FQDN to connect. The default host is localhost."
#define SHELL_PORT "The TCP/IP port number to use for the connection."
#define SHELL_USER "The user name to use when connecting to the server."
#define SHELL_PASSWORD "The password to use when connecting to the server."

View File

@ -243,8 +243,8 @@ void enumAllWords(STireNode** nodes, char* prefix, SMatch* match) {
continue;
} else {
// combine word string
memset(word, 0, sizeof(word));
strcpy(word, prefix);
memset(word, 0, tListLen(word));
strncpy(word, prefix, len);
word[len] = FIRST_ASCII + i; // append current char
// chain middle node
@ -315,8 +315,7 @@ void matchPrefixFromTree(STire* tire, char* prefix, SMatch* match) {
}
}
// return
return;
taosMemoryFree(root);
}
SMatch* matchPrefix(STire* tire, char* prefix, SMatch* match) {

1
tools/taosws-rs Submodule

@ -0,0 +1 @@
Subproject commit 7a94ffab45f08e16f09b3f430fe75d717054adb6

View File

@ -119,7 +119,7 @@ int smlProcess_json1_Test() {
" \"dc\": \"lga\""
" }"
" }"
"]"};
"]",};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -159,7 +159,7 @@ int smlProcess_json2_Test() {
" },"
" \"id\": \"d1001\""
" }"
"}"};
"}",};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -227,7 +227,7 @@ int smlProcess_json3_Test() {
" },"
" \"id\": \"d1001\""
" }"
"}"};
"}",};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
@ -286,7 +286,7 @@ int smlProcess_json4_Test() {
" \"t9\": false,"
" \"id\": \"d1001\""
" }"
"}"};
"}",};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));

View File

@ -155,7 +155,7 @@ static void printHelp() {
printf("%s%s\n", indent, "-l");
printf("%s%s%s\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
printf("%s%s\n", indent, "-p");
printf("%s%s%s\n", indent, indent, "producer thread number, default is 0");
printf("%s%s\n", indent, "-b");
@ -238,7 +238,7 @@ void saveConfigToLogFile() {
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
}
taosFprintfFile(g_fp, "\n");
taosFprintfFile(g_fp, " expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
taosFprintfFile(g_fp, " expect rows: %" PRIx64 "\n", g_stConfInfo.stThreads[i].expectMsgCnt);
}
char tmpString[128];
@ -263,11 +263,11 @@ void parseArgument(int32_t argc, char* argv[]) {
printHelp();
exit(0);
} else if (strcmp(argv[i], "-d") == 0) {
strcpy(g_stConfInfo.dbName, argv[++i]);
tstrncpy(g_stConfInfo.dbName, argv[++i], sizeof(g_stConfInfo.dbName));
} else if (strcmp(argv[i], "-w") == 0) {
strcpy(g_stConfInfo.cdbName, argv[++i]);
tstrncpy(g_stConfInfo.cdbName, argv[++i], sizeof(g_stConfInfo.cdbName));
} else if (strcmp(argv[i], "-c") == 0) {
strcpy(configDir, argv[++i]);
tstrncpy(configDir, argv[++i], PATH_MAX);
} else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-r") == 0) {
@ -279,9 +279,9 @@ void parseArgument(int32_t argc, char* argv[]) {
} else if (strcmp(argv[i], "-e") == 0) {
g_stConfInfo.useSnapshot = atol(argv[++i]);
} else if (strcmp(argv[i], "-t") == 0) {
char tmpBuf[56];
strcpy(tmpBuf, argv[++i]);
sprintf(g_stConfInfo.topic, "`%s`", tmpBuf);
char tmpBuf[56] = {0};
tstrncpy(tmpBuf, argv[++i], sizeof(tmpBuf));
sprintf(g_stConfInfo.topic, "`%s`", tmpBuf);
} else if (strcmp(argv[i], "-x") == 0) {
g_stConfInfo.numOfThread = atol(argv[++i]);
} else if (strcmp(argv[i], "-l") == 0) {
@ -294,6 +294,10 @@ void parseArgument(int32_t argc, char* argv[]) {
g_stConfInfo.producerRate = atol(argv[++i]);
} else if (strcmp(argv[i], "-n") == 0) {
g_stConfInfo.payloadLen = atol(argv[++i]);
if(g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024){
pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
exit(-1);
}
} else {
pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1);
@ -354,8 +358,8 @@ void ltrim(char* str) {
int queryDB(TAOS* taos, char* command) {
int retryCnt = 10;
int code;
TAOS_RES* pRes;
int code = 0;
TAOS_RES* pRes = NULL;
while (retryCnt--) {
pRes = taos_query(taos, command);
@ -363,10 +367,11 @@ int queryDB(TAOS* taos, char* command) {
if (code != 0) {
taosSsleep(1);
taos_free_result(pRes);
pRes = NULL;
continue;
}
taos_free_result(pRes);
return 0;
return 0;
}
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
@ -418,7 +423,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
char sqlStr[1100] = {0};
if (strlen(buf) > 1024) {
taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", (int)strlen(buf));
taosCloseFile(&g_fp);
return -1;
}
@ -592,7 +597,7 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
tmq_get_topic_name(msg), vgroupId);
@ -644,7 +649,7 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
int32_t vgroupId = tmq_get_vgroup_id(msg);
const char* dbName = tmq_get_db_name(msg);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "consumerId: %d, msg index:%d\n", pInfo->consumerId, msgIndex);
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
tmq_get_topic_name(msg), vgroupId);
@ -960,7 +965,7 @@ void parseConsumeInfo() {
ltrim(pstr);
char* ret = strchr(pstr, ch);
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
strcpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1);
tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1, sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
g_stConfInfo.stThreads[i].numOfKey++;
@ -1268,25 +1273,26 @@ void* ombProduceThreadFunc(void* param) {
for (int i = 0; i < batchPerTblTimes; ++i) {
uint32_t msgsOfSql = g_stConfInfo.batchSize;
if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
msgsOfSql = remainder;
msgsOfSql = remainder;
}
int len = 0;
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
for (int j = 0; j < msgsOfSql; j++) {
int64_t timeStamp = taosGetTimestampNs();
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
int64_t timeStamp = taosGetTimestampNs();
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
sendMsgs++;
pInfo->totalProduceMsgs++;
}
totalMsgLen += len;
totalMsgLen += len;
pInfo->totalMsgsLen += len;
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
if (affectedRows < 0) {
taos_close(pInfo->taos);
pInfo->taos = NULL;
return NULL;
pInfo->taos = NULL;
taosMemoryFree(sqlBuf);
return NULL;
}
affectedRowsTotal += affectedRows;
@ -1322,6 +1328,7 @@ void* ombProduceThreadFunc(void* param) {
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
taos_close(pInfo->taos);
pInfo->taos = NULL;
taosMemoryFree(sqlBuf);
return NULL;
}

View File

@ -663,7 +663,7 @@ void initLogFile() {
int main(int argc, char* argv[]) {
for (int32_t i = 1; i < argc; i++) {
if(strcmp(argv[i], "-c") == 0){
strcpy(g_conf.dir, argv[++i]);
tstrncpy(g_conf.dir, argv[++i], sizeof(g_conf.dir));
}else if(strcmp(argv[i], "-s") == 0){
g_conf.snapShot = true;
}else if(strcmp(argv[i], "-d") == 0){