Merge branch 'master' of github.com:taosdata/TDengine into test/chr
This commit is contained in:
commit
495eea74e0
|
@ -114,12 +114,18 @@ typedef enum TALBE_EXISTS_EN {
|
|||
TBL_EXISTS_BUTT
|
||||
} TALBE_EXISTS_EN;
|
||||
|
||||
enum MODE {
|
||||
enum enumSYNC_MODE {
|
||||
SYNC_MODE,
|
||||
ASYNC_MODE,
|
||||
MODE_BUT
|
||||
};
|
||||
|
||||
typedef enum enumQUERY_CLASS {
|
||||
SPECIFIED_CLASS,
|
||||
STABLE_CLASS,
|
||||
CLASS_BUT
|
||||
} QUERY_CLASS;
|
||||
|
||||
typedef enum enum_INSERT_MODE {
|
||||
PROGRESSIVE_INSERT_MODE,
|
||||
INTERLACE_INSERT_MODE,
|
||||
|
@ -6557,18 +6563,21 @@ static void specified_sub_callback(
|
|||
}
|
||||
|
||||
static TAOS_SUB* subscribeImpl(
|
||||
QUERY_CLASS class,
|
||||
threadInfo *pThreadInfo,
|
||||
char *sql, char* topic, bool restart)
|
||||
char *sql, char* topic, bool restart, uint64_t interval)
|
||||
{
|
||||
TAOS_SUB* tsub = NULL;
|
||||
|
||||
if (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode) {
|
||||
if ((SPECIFIED_CLASS == class)
|
||||
&& (ASYNC_MODE == g_queryInfo.specifiedQueryInfo.asyncMode)) {
|
||||
tsub = taos_subscribe(
|
||||
pThreadInfo->taos,
|
||||
restart,
|
||||
topic, sql, specified_sub_callback, (void*)pThreadInfo,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
} else if (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode) {
|
||||
} else if ((STABLE_CLASS == class)
|
||||
&& (ASYNC_MODE == g_queryInfo.superQueryInfo.asyncMode)) {
|
||||
tsub = taos_subscribe(
|
||||
pThreadInfo->taos,
|
||||
restart,
|
||||
|
@ -6578,7 +6587,7 @@ static TAOS_SUB* subscribeImpl(
|
|||
tsub = taos_subscribe(
|
||||
pThreadInfo->taos,
|
||||
restart,
|
||||
topic, sql, NULL, NULL, 0);
|
||||
topic, sql, NULL, NULL, interval);
|
||||
}
|
||||
|
||||
if (tsub == NULL) {
|
||||
|
@ -6629,9 +6638,14 @@ static void *superSubscribe(void *sarg) {
|
|||
char topic[32] = {0};
|
||||
for (uint64_t i = pThreadInfo->start_table_from;
|
||||
i <= pThreadInfo->end_table_to; i++) {
|
||||
verbosePrint("%s() LN%d, [%d], start=%"PRId64" end=%"PRId64" i=%"PRIu64"\n",
|
||||
__func__, __LINE__,
|
||||
pThreadInfo->threadID,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->end_table_to, i);
|
||||
sprintf(topic, "taosdemo-subscribe-%"PRIu64"-%"PRIu64"",
|
||||
i, pThreadInfo->querySeq);
|
||||
memset(subSqlstr,0,sizeof(subSqlstr));
|
||||
memset(subSqlstr, 0, sizeof(subSqlstr));
|
||||
replaceChildTblName(
|
||||
g_queryInfo.superQueryInfo.sql[pThreadInfo->querySeq],
|
||||
subSqlstr, i);
|
||||
|
@ -6644,8 +6658,10 @@ static void *superSubscribe(void *sarg) {
|
|||
debugPrint("%s() LN%d, [%d] subSqlstr: %s\n",
|
||||
__func__, __LINE__, pThreadInfo->threadID, subSqlstr);
|
||||
tsub[i] = subscribeImpl(
|
||||
STABLE_CLASS,
|
||||
pThreadInfo, subSqlstr, topic,
|
||||
g_queryInfo.superQueryInfo.subscribeRestart);
|
||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||
g_queryInfo.superQueryInfo.subscribeInterval);
|
||||
if (NULL == tsub[i]) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
|
@ -6687,8 +6703,10 @@ static void *superSubscribe(void *sarg) {
|
|||
g_queryInfo.superQueryInfo.subscribeKeepProgress);
|
||||
consumed[i]= 0;
|
||||
tsub[i] = subscribeImpl(
|
||||
STABLE_CLASS,
|
||||
pThreadInfo, subSqlstr, topic,
|
||||
g_queryInfo.superQueryInfo.subscribeRestart
|
||||
g_queryInfo.superQueryInfo.subscribeRestart,
|
||||
g_queryInfo.superQueryInfo.subscribeInterval
|
||||
);
|
||||
if (NULL == tsub[i]) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
|
@ -6744,10 +6762,12 @@ static void *specifiedSubscribe(void *sarg) {
|
|||
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
|
||||
pThreadInfo->threadID);
|
||||
}
|
||||
tsub = subscribeImpl(pThreadInfo,
|
||||
tsub = subscribeImpl(
|
||||
SPECIFIED_CLASS, pThreadInfo,
|
||||
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
|
||||
topic,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart);
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
if (NULL == tsub) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
|
@ -6777,11 +6797,12 @@ static void *specifiedSubscribe(void *sarg) {
|
|||
taos_unsubscribe(tsub,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
tsub = subscribeImpl(
|
||||
SPECIFIED_CLASS,
|
||||
pThreadInfo,
|
||||
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
|
||||
topic,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart
|
||||
);
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
if (NULL == tsub) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
|
@ -6833,7 +6854,7 @@ static int subscribeTestProcess() {
|
|||
|
||||
//==== create threads for query for specified table
|
||||
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
|
||||
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
|
||||
debugPrint("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
|
||||
__func__, __LINE__,
|
||||
g_queryInfo.specifiedQueryInfo.sqlCount);
|
||||
} else {
|
||||
|
@ -6870,10 +6891,10 @@ static int subscribeTestProcess() {
|
|||
}
|
||||
|
||||
//==== create threads for super table query
|
||||
if (g_queryInfo.specifiedQueryInfo.sqlCount <= 0) {
|
||||
printf("%s() LN%d, sepcified query sqlCount %"PRIu64".\n",
|
||||
if (g_queryInfo.superQueryInfo.sqlCount <= 0) {
|
||||
printf("%s() LN%d, super table query sqlCount %"PRIu64".\n",
|
||||
__func__, __LINE__,
|
||||
g_queryInfo.specifiedQueryInfo.sqlCount);
|
||||
g_queryInfo.superQueryInfo.sqlCount);
|
||||
} else {
|
||||
if ((g_queryInfo.superQueryInfo.sqlCount > 0)
|
||||
&& (g_queryInfo.superQueryInfo.threadCnt > 0)) {
|
||||
|
@ -6906,8 +6927,8 @@ static int subscribeTestProcess() {
|
|||
b = ntables % threads;
|
||||
}
|
||||
|
||||
uint64_t startFrom = 0;
|
||||
for (uint64_t i = 0; i < g_queryInfo.superQueryInfo.sqlCount; i++) {
|
||||
uint64_t startFrom = 0;
|
||||
for (int j = 0; j < threads; j++) {
|
||||
uint64_t seq = i * threads + j;
|
||||
threadInfo *t_info = infosOfStable + seq;
|
||||
|
|
Loading…
Reference in New Issue