Merge branch 'master' of github.com:taosdata/TDengine into test/chr

This commit is contained in:
tomchon 2021-05-22 13:53:20 +08:00
commit 165dc98995
5 changed files with 80 additions and 57 deletions

View File

@ -102,6 +102,12 @@ IF ("${CPUTYPE}" STREQUAL "")
SET(TD_LINUX TRUE) SET(TD_LINUX TRUE)
SET(TD_LINUX_64 FALSE) SET(TD_LINUX_64 FALSE)
SET(TD_ARM_64 TRUE) SET(TD_ARM_64 TRUE)
ELSEIF (CMAKE_SYSTEM_PROCESSOR MATCHES "mips64")
SET(CPUTYPE "mips64")
MESSAGE(STATUS "Set CPUTYPE to mips64")
SET(TD_LINUX TRUE)
SET(TD_LINUX_64 FALSE)
SET(TD_MIPS_64 TRUE)
ENDIF () ENDIF ()
ELSE () ELSE ()

View File

@ -623,6 +623,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
if (pSql->sqlstr == NULL) { if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self); tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
tscFreeSqlObj(pSql); tscFreeSqlObj(pSql);
free(pStream);
return NULL; return NULL;
} }

View File

@ -3453,16 +3453,6 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
} }
g_args.interlace_rows = interlaceRows->valueint; g_args.interlace_rows = interlaceRows->valueint;
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
}
} else if (!interlaceRows) { } else if (!interlaceRows) {
g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req g_args.interlace_rows = 0; // 0 means progressive mode, > 0 mean interlace mode. max value is less or equ num_of_records_per_req
} else { } else {
@ -3528,6 +3518,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
goto PARSE_OVER; goto PARSE_OVER;
} }
// rows per table need be less than insert batch
if (g_args.interlace_rows > g_args.num_of_RPR) {
printf("NOTICE: interlace rows value %"PRIu64" > num_of_records_per_req %"PRIu64"\n\n",
g_args.interlace_rows, g_args.num_of_RPR);
printf(" interlace rows value will be set to num_of_records_per_req %"PRIu64"\n\n",
g_args.num_of_RPR);
prompt();
g_args.interlace_rows = g_args.num_of_RPR;
}
cJSON* dbs = cJSON_GetObjectItem(root, "databases"); cJSON* dbs = cJSON_GetObjectItem(root, "databases");
if (!dbs || dbs->type != cJSON_Array) { if (!dbs || dbs->type != cJSON_Array) {
printf("ERROR: failed to read json, databases not found\n"); printf("ERROR: failed to read json, databases not found\n");
@ -4825,10 +4825,12 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
return affectedRows; return affectedRows;
} }
static void getTableName(char *pTblName, threadInfo* pThreadInfo, uint64_t tableSeq) static void getTableName(char *pTblName,
threadInfo* pThreadInfo, uint64_t tableSeq)
{ {
SSuperTable* superTblInfo = pThreadInfo->superTblInfo; SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
if (superTblInfo) { if ((superTblInfo)
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) {
if (superTblInfo->childTblLimit > 0) { if (superTblInfo->childTblLimit > 0) {
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s", snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
superTblInfo->childTblName + superTblInfo->childTblName +
@ -6601,6 +6603,7 @@ static void *superSubscribe(void *sarg) {
threadInfo *pThreadInfo = (threadInfo *)sarg; threadInfo *pThreadInfo = (threadInfo *)sarg;
char subSqlstr[MAX_QUERY_SQL_LENGTH]; char subSqlstr[MAX_QUERY_SQL_LENGTH];
TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0}; TAOS_SUB* tsub[MAX_QUERY_SQL_COUNT] = {0};
uint64_t tsubSeq;
if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) { if (pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n", errorPrint("The table number(%"PRId64") of the thread is more than max query sql count: %d\n",
@ -6609,6 +6612,15 @@ static void *superSubscribe(void *sarg) {
exit(-1); exit(-1);
} }
if (g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables > MAX_QUERY_SQL_COUNT) {
errorPrint("The number %"PRId64" of sql count(%"PRIu64") multiple the table number(%"PRId64") of the thread is more than max query sql count: %d\n",
g_queryInfo.superQueryInfo.sqlCount * pThreadInfo->ntables,
g_queryInfo.superQueryInfo.sqlCount,
pThreadInfo->ntables,
MAX_QUERY_SQL_COUNT);
exit(-1);
}
if (pThreadInfo->taos == NULL) { if (pThreadInfo->taos == NULL) {
TAOS * taos = NULL; TAOS * taos = NULL;
taos = taos_connect(g_queryInfo.host, taos = taos_connect(g_queryInfo.host,
@ -6637,6 +6649,8 @@ static void *superSubscribe(void *sarg) {
char topic[32] = {0}; char topic[32] = {0};
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
tsubSeq = i - pThreadInfo->start_table_from;
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n", verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
__func__, __LINE__, __func__, __LINE__,
pThreadInfo->threadID, pThreadInfo->threadID,
@ -6656,12 +6670,12 @@ static void *superSubscribe(void *sarg) {
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n", debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
__func__, __LINE__, pThreadInfo->threadID, subSqlstr); __func__, __LINE__, pThreadInfo->threadID, subSqlstr);
tsub[i] = subscribeImpl( tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS, STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval); g_queryInfo.superQueryInfo.subscribeInterval);
if (NULL == tsub[i]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
@ -6677,54 +6691,56 @@ static void *superSubscribe(void *sarg) {
while(1) { while(1) {
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) { tsubSeq = i - pThreadInfo->start_table_from;
continue; if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
} continue;
}
res = taos_consume(tsub[i]); res = taos_consume(tsub[tsubSeq]);
if (res) { if (res) {
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp); appendResultToFile(res, pThreadInfo->fp);
} }
if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) { if (g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
sprintf(pThreadInfo->fp, "%s-%d", sprintf(pThreadInfo->fp, "%s-%d",
g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq], g_queryInfo.superQueryInfo.result[pThreadInfo->querySeq],
pThreadInfo->threadID); pThreadInfo->threadID);
appendResultToFile(res, pThreadInfo->fp); appendResultToFile(res, pThreadInfo->fp);
} }
consumed[i] ++; consumed[tsubSeq] ++;
if ((g_queryInfo.superQueryInfo.subscribeKeepProgress) if ((g_queryInfo.superQueryInfo.subscribeKeepProgress)
&& (consumed[i] >= && (consumed[tsubSeq] >=
g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) { g_queryInfo.superQueryInfo.resubAfterConsume[pThreadInfo->querySeq])) {
printf("keepProgress:%d, resub super table query: %"PRIu64"\n", printf("keepProgress:%d, resub super table query: %"PRIu64"\n",
g_queryInfo.superQueryInfo.subscribeKeepProgress, g_queryInfo.superQueryInfo.subscribeKeepProgress,
pThreadInfo->querySeq); pThreadInfo->querySeq);
taos_unsubscribe(tsub, taos_unsubscribe(tsub[tsubSeq],
g_queryInfo.superQueryInfo.subscribeKeepProgress); g_queryInfo.superQueryInfo.subscribeKeepProgress);
consumed[i]= 0; consumed[tsubSeq]= 0;
tsub[i] = subscribeImpl( tsub[tsubSeq] = subscribeImpl(
STABLE_CLASS, STABLE_CLASS,
pThreadInfo, subSqlstr, topic, pThreadInfo, subSqlstr, topic,
g_queryInfo.superQueryInfo.subscribeRestart, g_queryInfo.superQueryInfo.subscribeRestart,
g_queryInfo.superQueryInfo.subscribeInterval g_queryInfo.superQueryInfo.subscribeInterval
); );
if (NULL == tsub[i]) { if (NULL == tsub[tsubSeq]) {
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);
return NULL; return NULL;
} }
} }
} }
} }
} }
taos_free_result(res); taos_free_result(res);
for (uint64_t i = pThreadInfo->start_table_from; for (uint64_t i = pThreadInfo->start_table_from;
i <= pThreadInfo->end_table_to; i++) { i <= pThreadInfo->end_table_to; i++) {
taos_unsubscribe(tsub[i], 0); tsubSeq = i - pThreadInfo->start_table_from;
taos_unsubscribe(tsub[tsubSeq], 0);
} }
taos_close(pThreadInfo->taos); taos_close(pThreadInfo->taos);

View File

@ -23,14 +23,14 @@
typedef void (*FLinuxSignalHandler)(int32_t signum, siginfo_t *sigInfo, void *context); typedef void (*FLinuxSignalHandler)(int32_t signum, siginfo_t *sigInfo, void *context);
void taosSetSignal(int32_t signum, FSignalHandler sigfp) { void taosSetSignal(int32_t signum, FSignalHandler sigfp) {
struct sigaction act = {{0}}; struct sigaction act; memset(&act, 0, sizeof(act));
#if 1 #if 1
act.sa_flags = SA_SIGINFO; act.sa_flags = SA_SIGINFO;
act.sa_sigaction = (FLinuxSignalHandler)sigfp; act.sa_sigaction = (FLinuxSignalHandler)sigfp;
#else #else
act.sa_handler = sigfp; act.sa_handler = sigfp;
#endif #endif
sigaction(signum, &act, NULL); sigaction(signum, &act, NULL);
} }
void taosIgnSignal(int32_t signum) { void taosIgnSignal(int32_t signum) {

View File

@ -17,7 +17,7 @@
misrepresented as being the original software. misrepresented as being the original software.
3. This notice may not be removed or altered from any source distribution. 3. This notice may not be removed or altered from any source distribution.
*/ */
#ifndef _TD_ARM_ #if !defined(_TD_ARM_) && !defined(_TD_MIPS_)
#include <nmmintrin.h> #include <nmmintrin.h>
#endif #endif