diff --git a/packaging/deb/makedeb.sh b/packaging/deb/makedeb.sh index a593c7ca16..28be037e6c 100755 --- a/packaging/deb/makedeb.sh +++ b/packaging/deb/makedeb.sh @@ -58,7 +58,12 @@ cp ${compile_dir}/build/lib/${libfile} ${pkg_dir}${install_home_pat cp ${compile_dir}/../src/inc/taos.h ${pkg_dir}${install_home_path}/include cp ${compile_dir}/../src/inc/taoserror.h ${pkg_dir}${install_home_path}/include cp -r ${top_dir}/tests/examples/* ${pkg_dir}${install_home_path}/examples -cp -r ${top_dir}/src/connector/grafanaplugin/dist ${pkg_dir}${install_home_path}/connector/grafanaplugin +if [ -d "${top_dir}/src/connector/grafanaplugin/dist" ]; then + cp -r ${top_dir}/src/connector/grafanaplugin/dist ${pkg_dir}${install_home_path}/connector/grafanaplugin +else + echo "grafanaplugin bundled directory not found!" + exit 1 +fi cp -r ${top_dir}/src/connector/python ${pkg_dir}${install_home_path}/connector cp -r ${top_dir}/src/connector/go ${pkg_dir}${install_home_path}/connector cp -r ${top_dir}/src/connector/nodejs ${pkg_dir}${install_home_path}/connector diff --git a/packaging/rpm/tdengine.spec b/packaging/rpm/tdengine.spec index 73efe2e1ab..9910e20bfe 100644 --- a/packaging/rpm/tdengine.spec +++ b/packaging/rpm/tdengine.spec @@ -66,7 +66,12 @@ cp %{_compiledir}/build/bin/taosdump %{buildroot}%{homepath}/bin cp %{_compiledir}/build/lib/${libfile} %{buildroot}%{homepath}/driver cp %{_compiledir}/../src/inc/taos.h %{buildroot}%{homepath}/include cp %{_compiledir}/../src/inc/taoserror.h %{buildroot}%{homepath}/include -cp -r %{_compiledir}/../src/connector/grafanaplugin/dist %{buildroot}%{homepath}/connector/grafanaplugin +if [ -d %{_compiledir}/../src/connector/grafanaplugin/dist ]; then + cp -r %{_compiledir}/../src/connector/grafanaplugin/dist %{buildroot}%{homepath}/connector/grafanaplugin +else + echo grafanaplugin bundled directory not found! + exit 1 +fi cp -r %{_compiledir}/../src/connector/python %{buildroot}%{homepath}/connector cp -r %{_compiledir}/../src/connector/go %{buildroot}%{homepath}/connector cp -r %{_compiledir}/../src/connector/nodejs %{buildroot}%{homepath}/connector diff --git a/packaging/tools/make_install.sh b/packaging/tools/make_install.sh index 5f1739d6e0..d6ace0a063 100755 --- a/packaging/tools/make_install.sh +++ b/packaging/tools/make_install.sh @@ -243,9 +243,17 @@ function install_data() { } function install_connector() { - ${csudo} cp -rf ${source_dir}/src/connector/grafanaplugin/dist ${install_main_dir}/connector/grafanaplugin + if [ -d "${source_dir}/src/connector/grafanaplugin/dist" ]; then + ${csudo} cp -rf ${source_dir}/src/connector/grafanaplugin/dist ${install_main_dir}/connector/grafanaplugin + else + echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!" + fi + if find ${source_dir}/src/connector/go -mindepth 1 -maxdepth 1 | read; then + ${csudo} cp -r ${source_dir}/src/connector/go ${install_main_dir}/connector + else + echo "WARNING: go connector not found, please check if want to use it!" + fi ${csudo} cp -rf ${source_dir}/src/connector/python ${install_main_dir}/connector - ${csudo} cp -rf ${source_dir}/src/connector/go ${install_main_dir}/connector ${csudo} cp ${binary_dir}/build/lib/*.jar ${install_main_dir}/connector &> /dev/null && ${csudo} chmod 777 ${install_main_dir}/connector/*.jar || echo &> /dev/null } diff --git a/packaging/tools/makeclient.sh b/packaging/tools/makeclient.sh index 7418bcc312..d0eeffc86a 100755 --- a/packaging/tools/makeclient.sh +++ b/packaging/tools/makeclient.sh @@ -117,9 +117,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [ "$osType" != "Darwin" ]; then cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: fi - cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin - cp -r ${connector_dir}/python ${install_dir}/connector/ - cp -r ${connector_dir}/go ${install_dir}/connector + if [ -d "${connector_dir}/grafanaplugin/dist" ]; then + cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin + else + echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!" + fi + if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then + cp -r ${connector_dir}/go ${install_dir}/connector + else + echo "WARNING: go connector not found, please check if want to use it!" + fi + cp -r ${connector_dir}/python ${install_dir}/connector cp -r ${connector_dir}/nodejs ${install_dir}/connector fi # Copy release note diff --git a/packaging/tools/makeclient_power.sh b/packaging/tools/makeclient_power.sh index 911cc574a5..8241319e4f 100755 --- a/packaging/tools/makeclient_power.sh +++ b/packaging/tools/makeclient_power.sh @@ -144,9 +144,17 @@ if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then if [ "$osType" != "Darwin" ]; then cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: fi - cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin - cp -r ${connector_dir}/python ${install_dir}/connector/ - cp -r ${connector_dir}/go ${install_dir}/connector + if [ -d "${connector_dir}/grafanaplugin/dist" ]; then + cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin + else + echo "WARNING: grafanaplugin bunlded dir not found, please check if want to use it!" + fi + if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then + cp -r ${connector_dir}/go ${install_dir}/connector + else + echo "WARNING: go connector not found, please check if want to use it!" + fi + cp -r ${connector_dir}/python ${install_dir}/connector sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py diff --git a/packaging/tools/makepkg.sh b/packaging/tools/makepkg.sh index 343e2a4224..d114d5eef8 100755 --- a/packaging/tools/makepkg.sh +++ b/packaging/tools/makepkg.sh @@ -131,9 +131,17 @@ connector_dir="${code_dir}/connector" mkdir -p ${install_dir}/connector if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: - cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin - cp -r ${connector_dir}/python ${install_dir}/connector/ - cp -r ${connector_dir}/go ${install_dir}/connector + if [ -d "${connector_dir}/grafanaplugin/dist" ]; then + cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin + else + echo "WARNING: grafanaplugin bundled dir not found, please check if you want to use it!" + fi + if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then + cp -r ${connector_dir}/go ${install_dir}/connector + else + echo "WARNING: go connector not found, please check if want to use it!" + fi + cp -r ${connector_dir}/python ${install_dir}/connector cp -r ${connector_dir}/nodejs ${install_dir}/connector fi # Copy release note diff --git a/packaging/tools/makepkg_power.sh b/packaging/tools/makepkg_power.sh index 8ea7f363c8..633a135c14 100755 --- a/packaging/tools/makepkg_power.sh +++ b/packaging/tools/makepkg_power.sh @@ -166,9 +166,18 @@ connector_dir="${code_dir}/connector" mkdir -p ${install_dir}/connector if [[ "$pagMode" != "lite" ]] && [[ "$cpuType" != "aarch32" ]]; then cp ${build_dir}/lib/*.jar ${install_dir}/connector ||: - cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin + + if [ -d "${connector_dir}/grafanaplugin/dist" ]; then + cp -r ${connector_dir}/grafanaplugin/dist ${install_dir}/connector/grafanaplugin + else + echo "WARNING: grafanaplugin bundled dir not found, please check if want to use it!" + fi + if find ${connector_dir}/go -mindepth 1 -maxdepth 1 | read; then + cp -r ${connector_dir}/go ${install_dir}/connector + else + echo "WARNING: go connector not found, please check if want to use it!" + fi cp -r ${connector_dir}/python ${install_dir}/connector/ - cp -r ${connector_dir}/go ${install_dir}/connector sed -i '/password/ {s/taosdata/powerdb/g}' ${install_dir}/connector/python/taos/cinterface.py diff --git a/src/kit/taosdemo/subscribe.json b/src/kit/taosdemo/subscribe.json index fd33a2e2e2..18846c8116 100644 --- a/src/kit/taosdemo/subscribe.json +++ b/src/kit/taosdemo/subscribe.json @@ -5,13 +5,33 @@ "port": 6030, "user": "root", "password": "taosdata", - "databases": "dbx", - "specified_table_query": - {"concurrent":1, "mode":"sync", "interval":5000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select avg(col1) from stb01 where col1 > 1;", "result": "./subscribe_res0.txt"}] - }, - "super_table_query": - {"stblname": "stb", "threads":1, "mode":"sync", "interval":10000, "restart":"yes", "keepProgress":"yes", - "sqls": [{"sql": "select col1 from xxxx where col1 > 10;", "result": "./subscribe_res1.txt"}] - } + "databases": "test", + "specified_table_query": { + "concurrent": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "resubAfterConsume": 10, + "sqls": [ + { + "sql": "select avg(col1) from meters where col1 > 1;", + "result": "./subscribe_res0.txt" + } + ] + }, + "super_table_query": { + "stblname": "meters", + "threads": 1, + "mode": "sync", + "interval": 1000, + "restart": "yes", + "keepProgress": "yes", + "sqls": [ + { + "sql": "select col1 from xxxx where col1 > 10;", + "result": "./subscribe_res1.txt" + } + ] + } } diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index abdbb9a316..95fb2f7bd4 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -372,6 +372,7 @@ typedef struct SpecifiedQueryInfo_S { int subscribeKeepProgress; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; + int resubAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; uint64_t totalQueried; } SpecifiedQueryInfo; @@ -390,6 +391,7 @@ typedef struct SuperQueryInfo_S { uint64_t sqlCount; char sql[MAX_QUERY_SQL_COUNT][MAX_QUERY_SQL_LENGTH+1]; char result[MAX_QUERY_SQL_COUNT][MAX_FILE_NAME_LEN+1]; + int resubAfterConsume[MAX_QUERY_SQL_COUNT]; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT]; char* childTblName; @@ -407,7 +409,7 @@ typedef struct SQueryMetaInfo_S { char queryMode[MAX_TB_NAME_SIZE]; // taosc, rest SpecifiedQueryInfo specifiedQueryInfo; - SuperQueryInfo superQueryInfo; + SuperQueryInfo superQueryInfo; uint64_t totalQueried; } SQueryMetaInfo; @@ -4287,6 +4289,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { } tstrncpy(g_queryInfo.specifiedQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + cJSON* resubAfterConsume = + cJSON_GetObjectItem(specifiedQuery, "resubAfterConsume"); + if (resubAfterConsume + && resubAfterConsume->type == cJSON_Number) { + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] + = resubAfterConsume->valueint; + } else if (!resubAfterConsume) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.specifiedQueryInfo.resubAfterConsume[j] = 1; + } + cJSON *result = cJSON_GetObjectItem(sql, "result"); if (NULL != result && result->type == cJSON_String && result->valuestring != NULL) { tstrncpy(g_queryInfo.specifiedQueryInfo.result[j], result->valuestring, MAX_FILE_NAME_LEN); @@ -4456,6 +4470,18 @@ static bool getMetaFromQueryJsonFile(cJSON* root) { tstrncpy(g_queryInfo.superQueryInfo.sql[j], sqlStr->valuestring, MAX_QUERY_SQL_LENGTH); + cJSON* superResubAfterConsume = + cJSON_GetObjectItem(sql, "resubAfterConsume"); + if (superResubAfterConsume + && superResubAfterConsume->type == cJSON_Number) { + g_queryInfo.superQueryInfo.resubAfterConsume[j] = + superResubAfterConsume->valueint; + } else if (!superResubAfterConsume) { + //printf("failed to read json, subscribe interval no found\n"); + //goto PARSE_OVER; + g_queryInfo.superQueryInfo.resubAfterConsume[j] = 1; + } + cJSON *result = cJSON_GetObjectItem(sql, "result"); if (result != NULL && result->type == cJSON_String && result->valuestring != NULL){ @@ -6546,17 +6572,18 @@ static void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int c } static TAOS_SUB* subscribeImpl( - TAOS *taos, char *sql, char* topic, char* resultFileName) { + TAOS *taos, char *sql, char* topic, bool restart, + char* resultFileName) { TAOS_SUB* tsub = NULL; if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { tsub = taos_subscribe(taos, - g_queryInfo.specifiedQueryInfo.subscribeRestart, + restart, topic, sql, subscribe_callback, (void*)resultFileName, g_queryInfo.specifiedQueryInfo.subscribeInterval); } else { tsub = taos_subscribe(taos, - g_queryInfo.specifiedQueryInfo.subscribeRestart, + restart, topic, sql, NULL, NULL, 0); } @@ -6610,6 +6637,7 @@ static void *superSubscribe(void *sarg) { return NULL; } + uint64_t subSeq; char topic[32] = {0}; for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { @@ -6623,10 +6651,12 @@ static void *superSubscribe(void *sarg) { g_queryInfo.superQueryInfo.result[j], pThreadInfo->threadID); } - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", __func__, __LINE__, subSeq, subSqlstr); - tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, tmpFile); + tsub[subSeq] = subscribeImpl(pThreadInfo->taos, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + tmpFile); if (NULL == tsub[subSeq]) { taos_close(pThreadInfo->taos); return NULL; @@ -6634,6 +6664,10 @@ static void *superSubscribe(void *sarg) { } } + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) + consumed[i] = 0; + // start loop to consume result TAOS_RES* res = NULL; while(1) { @@ -6643,7 +6677,7 @@ static void *superSubscribe(void *sarg) { continue; } - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; taosMsleep(100); // ms res = taos_consume(tsub[subSeq]); if (res) { @@ -6654,6 +6688,28 @@ static void *superSubscribe(void *sarg) { pThreadInfo->threadID); appendResultToFile(res, tmpFile); } + consumed[j] ++; + + if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) + && (consumed[j] >= + g_queryInfo.superQueryInfo.resubAfterConsume[j])) { + printf("keepProgress:%d, resub super table query: %d\n", + g_queryInfo.superQueryInfo.subscribeKeepProgress, j); + taos_unsubscribe(tsub[subSeq], + g_queryInfo.superQueryInfo.subscribeKeepProgress); + consumed[j]= 0; + subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + debugPrint("%s() LN%d, subSeq=%"PRIu64" subSqlstr: %s\n", + __func__, __LINE__, subSeq, subSqlstr); + tsub[subSeq] = subscribeImpl( + pThreadInfo->taos, subSqlstr, topic, + g_queryInfo.superQueryInfo.subscribeRestart, + tmpFile); + if (NULL == tsub[subSeq]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } } } } @@ -6663,9 +6719,8 @@ static void *superSubscribe(void *sarg) { for (uint64_t i = pThreadInfo->start_table_from; i <= pThreadInfo->end_table_to; i++) { for (int j = 0; j < g_queryInfo.superQueryInfo.sqlCount; j++) { - uint64_t subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; - taos_unsubscribe(tsub[subSeq], - g_queryInfo.superQueryInfo.subscribeKeepProgress); + subSeq = i * g_queryInfo.superQueryInfo.sqlCount + j; + taos_unsubscribe(tsub[subSeq], 0); } } @@ -6713,14 +6768,22 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); } tsub[i] = subscribeImpl(pThreadInfo->taos, - g_queryInfo.specifiedQueryInfo.sql[i], topic, tmpFile); + g_queryInfo.specifiedQueryInfo.sql[i], topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + tmpFile); if (NULL == tsub[i]) { taos_close(pThreadInfo->taos); return NULL; } } + // start loop to consume result TAOS_RES* res = NULL; + + int consumed[MAX_QUERY_SQL_COUNT]; + for (int i = 0; i < MAX_QUERY_SQL_COUNT; i++) + consumed[i] = 0; + while(1) { for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) { @@ -6736,14 +6799,32 @@ static void *specifiedSubscribe(void *sarg) { g_queryInfo.specifiedQueryInfo.result[i], pThreadInfo->threadID); appendResultToFile(res, tmpFile); } + consumed[i] ++; + + if ((g_queryInfo.specifiedQueryInfo.subscribeKeepProgress) + && (consumed[i] >= + g_queryInfo.specifiedQueryInfo.resubAfterConsume[i])) { + printf("keepProgress:%d, resub specified query: %d\n", + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress, i); + consumed[i] = 0; + taos_unsubscribe(tsub[i], + g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + tsub[i] = subscribeImpl(pThreadInfo->taos, + g_queryInfo.specifiedQueryInfo.sql[i], topic, + g_queryInfo.specifiedQueryInfo.subscribeRestart, + tmpFile); + if (NULL == tsub[i]) { + taos_close(pThreadInfo->taos); + return NULL; + } + } } } } taos_free_result(res); for (int i = 0; i < g_queryInfo.specifiedQueryInfo.sqlCount; i++) { - taos_unsubscribe(tsub[i], - g_queryInfo.specifiedQueryInfo.subscribeKeepProgress); + taos_unsubscribe(tsub[i], 0); } taos_close(pThreadInfo->taos);