Merge branch '3.0' into auth
This commit is contained in:
commit
80ae360e8b
|
@ -491,7 +491,7 @@ typedef struct {
|
|||
char intervalUnit;
|
||||
char slidingUnit;
|
||||
char
|
||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
||||
int8_t precision;
|
||||
int64_t interval;
|
||||
int64_t sliding;
|
||||
|
@ -651,7 +651,6 @@ typedef struct SQueryNodeAddr {
|
|||
SEpSet epSet;
|
||||
} SQueryNodeAddr;
|
||||
|
||||
|
||||
typedef struct {
|
||||
SArray* pArray; // Array of SUseDbRsp
|
||||
} SUseDbBatchRsp;
|
||||
|
@ -724,7 +723,7 @@ typedef struct {
|
|||
|
||||
int32_t tSerializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||
int32_t tDeserializeSRetrieveFuncRsp(void* buf, int32_t bufLen, SRetrieveFuncRsp* pRsp);
|
||||
void tFreeSFuncInfo(SFuncInfo *pInfo);
|
||||
void tFreeSFuncInfo(SFuncInfo* pInfo);
|
||||
void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
|
@ -1289,14 +1288,14 @@ typedef struct {
|
|||
} SMVCreateStreamRsp, SMSCreateStreamRsp;
|
||||
|
||||
typedef struct {
|
||||
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
||||
int8_t igExists;
|
||||
int8_t withTbName;
|
||||
int8_t withSchema;
|
||||
int8_t withTag;
|
||||
char* sql;
|
||||
char* ast;
|
||||
char subscribeDbName[TSDB_DB_NAME_LEN];
|
||||
char name[TSDB_TOPIC_FNAME_LEN]; // accout.topic
|
||||
int8_t igExists;
|
||||
int8_t withTbName;
|
||||
int8_t withSchema;
|
||||
int8_t withTag;
|
||||
char* sql;
|
||||
char* ast;
|
||||
char subscribeDbName[TSDB_DB_NAME_LEN];
|
||||
} SCMCreateTopicReq;
|
||||
|
||||
int32_t tSerializeSCMCreateTopicReq(void* buf, int32_t bufLen, const SCMCreateTopicReq* pReq);
|
||||
|
|
|
@ -92,7 +92,7 @@ typedef struct SWalReadHead {
|
|||
int8_t headVer;
|
||||
int8_t reserved;
|
||||
int16_t msgType;
|
||||
int32_t len;
|
||||
int32_t bodyLen;
|
||||
int64_t ingestTs; // not implemented
|
||||
int64_t version;
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ static int32_t mndProcessCommitOffsetReq(SNodeMsg *pMsg) {
|
|||
bool create = false;
|
||||
SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, key);
|
||||
if (pOffsetObj == NULL) {
|
||||
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffset));
|
||||
pOffsetObj = taosMemoryMalloc(sizeof(SMqOffsetObj));
|
||||
memcpy(pOffsetObj->key, key, TSDB_PARTITION_KEY_LEN);
|
||||
create = true;
|
||||
}
|
||||
|
|
|
@ -29,7 +29,8 @@ static void syncEnvTick(void *param, void *tmrId);
|
|||
int32_t syncEnvStart() {
|
||||
int32_t ret = 0;
|
||||
taosSeedRand(taosGetTimestampSec());
|
||||
gSyncEnv = doSyncEnvStart(gSyncEnv);
|
||||
//gSyncEnv = doSyncEnvStart(gSyncEnv);
|
||||
gSyncEnv = doSyncEnvStart();
|
||||
assert(gSyncEnv != NULL);
|
||||
sTrace("syncEnvStart ok!");
|
||||
return ret;
|
||||
|
@ -91,8 +92,12 @@ static SSyncEnv *doSyncEnvStart() {
|
|||
}
|
||||
|
||||
static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) {
|
||||
taosTmrCleanUp(pSyncEnv->pTimerManager);
|
||||
taosMemoryFree(pSyncEnv);
|
||||
assert(pSyncEnv == gSyncEnv);
|
||||
if (pSyncEnv != NULL) {
|
||||
taosTmrCleanUp(pSyncEnv->pTimerManager);
|
||||
taosMemoryFree(pSyncEnv);
|
||||
}
|
||||
gSyncEnv = NULL;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||
assert(walReadWithHandle(pWalHandle, index) == 0);
|
||||
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.len);
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||
assert(pEntry != NULL);
|
||||
|
||||
pEntry->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||
|
@ -80,8 +80,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
|||
pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
|
||||
pEntry->term = pWalHandle->pHead->head.syncMeta.term;
|
||||
pEntry->index = index;
|
||||
assert(pEntry->dataLen == pWalHandle->pHead->head.len);
|
||||
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.len);
|
||||
assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen);
|
||||
memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
|
||||
|
||||
// need to hold, do not new every time!!
|
||||
walCloseReadHandle(pWalHandle);
|
||||
|
|
|
@ -16,10 +16,10 @@
|
|||
#ifndef _TD_WAL_INT_H_
|
||||
#define _TD_WAL_INT_H_
|
||||
|
||||
#include "tcompare.h"
|
||||
#include "taoserror.h"
|
||||
#include "tchecksum.h"
|
||||
#include "tcoding.h"
|
||||
#include "tcompare.h"
|
||||
#include "wal.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
|
@ -41,10 +41,10 @@ typedef struct WalIdxEntry {
|
|||
} SWalIdxEntry;
|
||||
|
||||
static inline int tSerializeWalIdxEntry(void** buf, SWalIdxEntry* pIdxEntry) {
|
||||
int tlen;
|
||||
int tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pIdxEntry->ver);
|
||||
tlen += taosEncodeFixedI64(buf, pIdxEntry->offset);
|
||||
return 0;
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static inline void* tDeserializeWalIdxEntry(void* buf, SWalIdxEntry* pIdxEntry) {
|
||||
|
@ -103,7 +103,7 @@ static inline int walValidHeadCksum(SWalHead* pHead) {
|
|||
}
|
||||
|
||||
static inline int walValidBodyCksum(SWalHead* pHead) {
|
||||
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.len, pHead->cksumBody);
|
||||
return taosCheckChecksum((uint8_t*)pHead->head.body, pHead->head.bodyLen, pHead->cksumBody);
|
||||
}
|
||||
|
||||
static inline int walValidCksum(SWalHead* pHead, void* body, int64_t bodyLen) {
|
||||
|
|
|
@ -144,12 +144,12 @@ int32_t walReadWithHandle_s(SWalReadHandle *pRead, int64_t ver, SWalReadHead **p
|
|||
taosThreadMutexUnlock(&pRead->mutex);
|
||||
return -1;
|
||||
}
|
||||
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.len);
|
||||
*ppHead = taosMemoryMalloc(sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
|
||||
if (*ppHead == NULL) {
|
||||
taosThreadMutexUnlock(&pRead->mutex);
|
||||
return -1;
|
||||
}
|
||||
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.len);
|
||||
memcpy(*ppHead, &pRead->pHead->head, sizeof(SWalReadHead) + pRead->pHead->head.bodyLen);
|
||||
taosThreadMutexUnlock(&pRead->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
@ -178,16 +178,18 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
|
|||
terrno = TSDB_CODE_WAL_FILE_CORRUPTED;
|
||||
return -1;
|
||||
}
|
||||
if (pRead->capacity < pRead->pHead->head.len) {
|
||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.len);
|
||||
if (pRead->capacity < pRead->pHead->head.bodyLen) {
|
||||
void *ptr = taosMemoryRealloc(pRead->pHead, sizeof(SWalHead) + pRead->pHead->head.bodyLen);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
pRead->pHead = ptr;
|
||||
pRead->capacity = pRead->pHead->head.len;
|
||||
pRead->capacity = pRead->pHead->head.bodyLen;
|
||||
}
|
||||
if (pRead->pHead->head.len != taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.len)) {
|
||||
|
||||
if (pRead->pHead->head.bodyLen !=
|
||||
taosReadFile(pRead->pReadLogTFile, pRead->pHead->head.body, pRead->pHead->head.bodyLen)) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -295,7 +295,7 @@ int64_t walWriteWithSyncInfo(SWal *pWal, int64_t index, tmsg_t msgType, SSyncLog
|
|||
pWal->writeHead.head.version = index;
|
||||
|
||||
int64_t offset = walGetCurFileOffset(pWal);
|
||||
pWal->writeHead.head.len = bodyLen;
|
||||
pWal->writeHead.head.bodyLen = bodyLen;
|
||||
pWal->writeHead.head.msgType = msgType;
|
||||
|
||||
// sync info
|
||||
|
|
|
@ -320,7 +320,7 @@ TEST_F(WalKeepEnv, readHandleRead) {
|
|||
char newStr[100];
|
||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||
int len = strlen(newStr);
|
||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
||||
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||
for (int j = 0; j < len; j++) {
|
||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||
}
|
||||
|
@ -372,7 +372,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
|||
char newStr[100];
|
||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||
int len = strlen(newStr);
|
||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
||||
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||
for (int j = 0; j < len; j++) {
|
||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||
}
|
||||
|
@ -402,7 +402,7 @@ TEST_F(WalRetentionEnv, repairMeta1) {
|
|||
char newStr[100];
|
||||
sprintf(newStr, "%s-%d", ranStr, ver);
|
||||
int len = strlen(newStr);
|
||||
ASSERT_EQ(pRead->pHead->head.len, len);
|
||||
ASSERT_EQ(pRead->pHead->head.bodyLen, len);
|
||||
for (int j = 0; j < len; j++) {
|
||||
EXPECT_EQ(newStr[j], pRead->pHead->head.body[j]);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,394 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
|
||||
run tsim/tmq/prepareBasicEnv-1vgrp.sim
|
||||
|
||||
#---- global parameters start ----#
|
||||
$dbName = db
|
||||
$vgroups = 1
|
||||
$stbPrefix = stb
|
||||
$ctbPrefix = ctb
|
||||
$ntbPrefix = ntb
|
||||
$stbNum = 1
|
||||
$ctbNum = 10
|
||||
$ntbNum = 10
|
||||
$rowsPerCtb = 10
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#---- global parameters end ----#
|
||||
|
||||
$pullDelay = 5
|
||||
$ifcheckdata = 1
|
||||
$showMsg = 1
|
||||
$showRow = 0
|
||||
|
||||
sql connect
|
||||
sql use $dbName
|
||||
|
||||
print == create topics from super table
|
||||
sql create topic topic_stb_column as select ts, c3 from stb
|
||||
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
print == create topics from child table
|
||||
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||
sql create topic topic_ctb_all as select * from ctb0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||
|
||||
print == create topics from normal table
|
||||
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||
sql create topic topic_ntb_all as select * from ntb0
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||
|
||||
#sql show topics
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
$keyList = ' . group.id:cgrp1
|
||||
$keyList = $keyList . '
|
||||
|
||||
$cdb_index = 0
|
||||
#=============================== start consume =============================#
|
||||
|
||||
print ================ test consume from stb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_stb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for stb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_stb_column
|
||||
$topicList = ' . topic_stb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_stb_all
|
||||
$topicList = ' . topic_stb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_stb_function
|
||||
$topicList = ' . topic_stb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_stb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfStb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from stb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_stb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_stb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != 0 then
|
||||
if $data[0][2] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
if $data[1][2] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != 0 then
|
||||
if $data[0][3] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
if $data[1][3] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_stb
|
||||
loop_consume_diff_topic_from_stb_end:
|
||||
|
||||
print ================ test consume from ctb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_ctb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ctb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_ctb_column
|
||||
$topicList = ' . topic_ctb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_ctb_all
|
||||
$topicList = ' . topic_ctb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_ctb_function
|
||||
$topicList = ' . topic_ctb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_ctb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfCtb = $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfCtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ctb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_ctb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ctb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_ctb
|
||||
loop_consume_diff_topic_from_ctb_end:
|
||||
|
||||
print ================ test consume from ntb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_ntb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ntb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_ntb_column
|
||||
$topicList = ' . topic_ntb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_ntb_all
|
||||
$topicList = ' . topic_ntb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_ntb_function
|
||||
$topicList = ' . topic_ntb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_ntb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfNtb = $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfNtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ntb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result from ntb
|
||||
wait_consumer_end_from_ntb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ntb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][1] == 0 then
|
||||
if $data[0][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_ntb
|
||||
loop_consume_diff_topic_from_ntb_end:
|
||||
|
||||
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,333 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
|
||||
run tsim/tmq/prepareBasicEnv-1vgrp.sim
|
||||
|
||||
#---- global parameters start ----#
|
||||
$dbName = db
|
||||
$vgroups = 1
|
||||
$stbPrefix = stb
|
||||
$ctbPrefix = ctb
|
||||
$ntbPrefix = ntb
|
||||
$stbNum = 1
|
||||
$ctbNum = 10
|
||||
$ntbNum = 10
|
||||
$rowsPerCtb = 10
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#---- global parameters end ----#
|
||||
|
||||
$pullDelay = 5
|
||||
$ifcheckdata = 1
|
||||
$showMsg = 1
|
||||
$showRow = 0
|
||||
|
||||
sql connect
|
||||
sql use $dbName
|
||||
|
||||
print == create topics from super table
|
||||
sql create topic topic_stb_column as select ts, c3 from stb
|
||||
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
print == create topics from child table
|
||||
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||
sql create topic topic_ctb_all as select * from ctb0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||
|
||||
print == create topics from normal table
|
||||
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||
sql create topic topic_ntb_all as select * from ntb0
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||
|
||||
#sql show topics
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
$keyList = ' . group.id:cgrp1
|
||||
$keyList = $keyList . '
|
||||
|
||||
$topicNum = 3
|
||||
|
||||
#=============================== start consume =============================#
|
||||
|
||||
|
||||
print ================ test consume from stb
|
||||
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
|
||||
$topicList = ' . topic_stb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfStb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from stb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_stb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_stb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != 0 then
|
||||
if $data[0][2] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
if $data[1][2] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != 0 then
|
||||
if $data[0][3] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
if $data[1][3] != $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb1
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ctb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ctb
|
||||
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
|
||||
$topicList = ' . topic_ctb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfCtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ctb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_ctb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ctb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb2
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table for ntb
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ntb
|
||||
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
|
||||
$topicList = ' . topic_ntb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfNtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ntb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result from ntb
|
||||
wait_consumer_end_from_ntb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ntb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][1] == 0 then
|
||||
if $data[0][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,382 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
|
||||
run tsim/tmq/prepareBasicEnv-4vgrp.sim
|
||||
|
||||
#---- global parameters start ----#
|
||||
$dbName = db
|
||||
$vgroups = 4
|
||||
$stbPrefix = stb
|
||||
$ctbPrefix = ctb
|
||||
$ntbPrefix = ntb
|
||||
$stbNum = 1
|
||||
$ctbNum = 10
|
||||
$ntbNum = 10
|
||||
$rowsPerCtb = 10
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#---- global parameters end ----#
|
||||
|
||||
$pullDelay = 5
|
||||
$ifcheckdata = 1
|
||||
$showMsg = 1
|
||||
$showRow = 0
|
||||
|
||||
sql connect
|
||||
sql use $dbName
|
||||
|
||||
print == create topics from super table
|
||||
sql create topic topic_stb_column as select ts, c3 from stb
|
||||
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
print == create topics from child table
|
||||
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||
sql create topic topic_ctb_all as select * from ctb0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||
|
||||
print == create topics from normal table
|
||||
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||
sql create topic topic_ntb_all as select * from ntb0
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||
|
||||
#sql show topics
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
$keyList = ' . group.id:cgrp1
|
||||
$keyList = $keyList . '
|
||||
|
||||
$cdb_index = 0
|
||||
#=============================== start consume =============================#
|
||||
|
||||
print ================ test consume from stb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_stb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_stb_column
|
||||
$topicList = ' . topic_stb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_stb_all
|
||||
$topicList = ' . topic_stb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_stb_function
|
||||
$topicList = ' . topic_stb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_stb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfStb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from stb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_stb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_stb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[1][2] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][3] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[1][3] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_stb
|
||||
loop_consume_diff_topic_from_stb_end:
|
||||
|
||||
print ================ test consume from ctb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_ctb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_ctb_column
|
||||
$topicList = ' . topic_ctb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_ctb_all
|
||||
$topicList = ' . topic_ctb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_ctb_function
|
||||
$topicList = ' . topic_ctb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_ctb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfCtb = $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfCtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ctb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_ctb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ctb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_ctb
|
||||
loop_consume_diff_topic_from_ctb_end:
|
||||
|
||||
print ================ test consume from ntb
|
||||
$loop_cnt = 0
|
||||
loop_consume_diff_topic_from_ntb:
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdb_index = $cdb_index + 1
|
||||
$cdbName = cdb . $cdb_index
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
if $loop_cnt == 0 then
|
||||
print == scenario 1: topic_ntb_column
|
||||
$topicList = ' . topic_ntb_column
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 1 then
|
||||
print == scenario 2: topic_ntb_all
|
||||
$topicList = ' . topic_ntb_all
|
||||
$topicList = $topicList . '
|
||||
elif $loop_cnt == 2 then
|
||||
print == scenario 3: topic_ntb_function
|
||||
$topicList = ' . topic_ntb_function
|
||||
$topicList = $topicList . '
|
||||
else
|
||||
goto loop_consume_diff_topic_from_ntb_end
|
||||
endi
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfNtb = $rowsPerCtb
|
||||
$expectmsgcnt = $totalMsgOfNtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ntb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result from ntb
|
||||
wait_consumer_end_from_ntb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ntb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][1] == 0 then
|
||||
if $data[0][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
$loop_cnt = $loop_cnt + 1
|
||||
goto loop_consume_diff_topic_from_ntb
|
||||
loop_consume_diff_topic_from_ntb_end:
|
||||
|
||||
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -0,0 +1,324 @@
|
|||
#### test scenario, please refer to https://jira.taosdata.com:18090/pages/viewpage.action?pageId=135120406
|
||||
#basic1Of2Cons.sim: vgroups=1, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic2Of2Cons.sim: vgroups=1, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic3Of2Cons.sim: vgroups=4, one topic for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
#basic4Of2Cons.sim: vgroups=4, multi topics for 2 consumers, firstly insert data, then start consume. Include six topics
|
||||
|
||||
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
|
||||
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
|
||||
#
|
||||
# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
|
||||
#
|
||||
|
||||
run tsim/tmq/prepareBasicEnv-4vgrp.sim
|
||||
|
||||
#---- global parameters start ----#
|
||||
$dbName = db
|
||||
$vgroups = 4
|
||||
$stbPrefix = stb
|
||||
$ctbPrefix = ctb
|
||||
$ntbPrefix = ntb
|
||||
$stbNum = 1
|
||||
$ctbNum = 10
|
||||
$ntbNum = 10
|
||||
$rowsPerCtb = 10
|
||||
$tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||
#---- global parameters end ----#
|
||||
|
||||
$pullDelay = 5
|
||||
$ifcheckdata = 1
|
||||
$showMsg = 1
|
||||
$showRow = 0
|
||||
|
||||
sql connect
|
||||
sql use $dbName
|
||||
|
||||
print == create topics from super table
|
||||
sql create topic topic_stb_column as select ts, c3 from stb
|
||||
sql create topic topic_stb_all as select ts, c1, c2, c3 from stb
|
||||
sql create topic topic_stb_function as select ts, abs(c1), sin(c2) from stb
|
||||
|
||||
print == create topics from child table
|
||||
sql create topic topic_ctb_column as select ts, c3 from ctb0
|
||||
sql create topic topic_ctb_all as select * from ctb0
|
||||
sql create topic topic_ctb_function as select ts, abs(c1), sin(c2) from ctb0
|
||||
|
||||
print == create topics from normal table
|
||||
sql create topic topic_ntb_column as select ts, c3 from ntb0
|
||||
sql create topic topic_ntb_all as select * from ntb0
|
||||
sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||
|
||||
#sql show topics
|
||||
#if $rows != 9 then
|
||||
# return -1
|
||||
#endi
|
||||
|
||||
$keyList = ' . group.id:cgrp1
|
||||
$keyList = $keyList . '
|
||||
|
||||
$topicNum = 3
|
||||
|
||||
print ================ test consume from stb
|
||||
print == multi toipcs: topic_stb_column + topic_stb_all + topic_stb_function
|
||||
$topicList = ' . topic_stb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_stb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfStb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from stb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_stb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_stb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[1][2] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][3] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[1][3] <= 0 then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] >= $expectmsgcnt then
|
||||
return -1
|
||||
endi
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb1
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ctb
|
||||
print == multi toipcs: topic_ctb_column + topic_ctb_all + topic_ctb_function
|
||||
$topicList = ' . topic_ctb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ctb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfCtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ctb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result
|
||||
wait_consumer_end_from_ctb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ctb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[0][1] == 1 then
|
||||
if $data[1][1] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfCtb then
|
||||
if $data[0][2] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfCtb then
|
||||
if $data[0][3] != $totalMsgOfCtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
#######################################################################################
|
||||
# clear consume info and consume result
|
||||
#run tsim/tmq/clearConsume.sim
|
||||
# because drop table function no stable, so by create new db for consume info and result. Modify it later
|
||||
$cdbName = cdb2
|
||||
sql create database $cdbName vgroups 1
|
||||
sleep 500
|
||||
sql use $cdbName
|
||||
|
||||
print == create consume info table and consume result table
|
||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||
|
||||
sql show tables
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
#######################################################################################
|
||||
|
||||
|
||||
print ================ test consume from ntb
|
||||
print == multi toipcs: topic_ntb_column + topic_ntb_all + topic_ntb_function
|
||||
$topicList = ' . topic_ntb_column
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_all
|
||||
$topicList = $topicList . ,
|
||||
$topicList = $topicList . topic_ntb_function
|
||||
$topicList = $topicList . '
|
||||
|
||||
$consumerId = 0
|
||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||
$expectmsgcnt = $totalMsgOfNtb
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
$consumerId = 1
|
||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
||||
|
||||
print == start consumer to pull msgs from ntb
|
||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||
system tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||
|
||||
print == check consume result from ntb
|
||||
wait_consumer_end_from_ntb:
|
||||
sql select * from consumeresult
|
||||
print ==> rows: $rows
|
||||
print ==> rows[0]: $data[0][0] $data[0][1] $data[0][2] $data[0][3] $data[0][4] $data[0][5] $data[0][6]
|
||||
print ==> rows[1]: $data[1][0] $data[1][1] $data[1][2] $data[1][3] $data[1][4] $data[1][5] $data[1][6]
|
||||
if $rows != 2 then
|
||||
sleep 1000
|
||||
goto wait_consumer_end_from_ntb
|
||||
endi
|
||||
if $data[0][1] == 0 then
|
||||
if $data[1][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][1] == 0 then
|
||||
if $data[0][1] != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][2] != $totalMsgOfNtb then
|
||||
if $data[0][2] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][2] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[0][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
if $data[1][3] != $totalMsgOfNtb then
|
||||
if $data[0][3] != $totalMsgOfNtb then
|
||||
return -1
|
||||
endi
|
||||
if $data[1][3] != 0 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
#------ not need stop consumer, because it exit after pull msg overthan expect msg
|
||||
#system tsim/tmq/consume.sh -s stop -x SIGINT
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -371,6 +371,8 @@ void *consumeThreadFunc(void *param) {
|
|||
|
||||
loop_consume(pInfo);
|
||||
|
||||
tmq_commit(pInfo->tmq, NULL, 0);
|
||||
|
||||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
if (err) {
|
||||
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
|
|
Loading…
Reference in New Issue