Merge branch '3.0' of github.com:taosdata/TDengine into feature/udf
This commit is contained in:
commit
7bb38bed2f
|
@ -658,6 +658,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin
|
||||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||||
|
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId);
|
||||||
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode);
|
||||||
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
SColumn extractColumnFromColumnNode(SColumnNode* pColNode);
|
||||||
|
|
||||||
|
|
|
@ -2055,6 +2055,11 @@ void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const int8_t* rowR
|
||||||
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
SColumnInfoData* pDst = taosArrayGet(px->pDataBlock, i);
|
||||||
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pSrc = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
||||||
|
// it is a reserved column for scalar function, and no data in this column yet.
|
||||||
|
if (pSrc->pData == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
for (int32_t j = 0; j < totalRows; ++j) {
|
for (int32_t j = 0; j < totalRows; ++j) {
|
||||||
if (rowRes[j] == 0) {
|
if (rowRes[j] == 0) {
|
||||||
|
|
|
@ -291,20 +291,7 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
||||||
|
|
||||||
// this is to handle the tbname
|
// this is to handle the tbname
|
||||||
if (fmIsScanPseudoColumnFunc(functionId)) {
|
if (fmIsScanPseudoColumnFunc(functionId)) {
|
||||||
struct SScalarFuncExecFuncs fpSet = {0};
|
setTbNameColData(pTableScanInfo->readHandle.meta, pBlock, pColInfoData, functionId);
|
||||||
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
|
||||||
|
|
||||||
SColumnInfoData infoData = {0};
|
|
||||||
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
|
||||||
infoData.info.bytes = sizeof(uint64_t);
|
|
||||||
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
|
||||||
|
|
||||||
colDataAppendInt64(&infoData, 0, &pBlock->info.uid);
|
|
||||||
SScalarParam srcParam = {
|
|
||||||
.numOfRows = pBlock->info.rows, .param = pTableScanInfo->readHandle.meta, .columnData = &infoData};
|
|
||||||
|
|
||||||
SScalarParam param = {.columnData = pColInfoData};
|
|
||||||
fpSet.process(&srcParam, 1, ¶m);
|
|
||||||
} else { // these are tags
|
} else { // these are tags
|
||||||
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
const char* p = metaGetTableTagVal(&mr.me, pExpr->base.pParam[0].pCol->colId);
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
@ -316,6 +303,23 @@ void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock)
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void setTbNameColData(void* pMeta, const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId) {
|
||||||
|
struct SScalarFuncExecFuncs fpSet = {0};
|
||||||
|
fmGetScalarFuncExecFuncs(functionId, &fpSet);
|
||||||
|
|
||||||
|
SColumnInfoData infoData = {0};
|
||||||
|
infoData.info.type = TSDB_DATA_TYPE_BIGINT;
|
||||||
|
infoData.info.bytes = sizeof(uint64_t);
|
||||||
|
colInfoDataEnsureCapacity(&infoData, 0, 1);
|
||||||
|
|
||||||
|
colDataAppendInt64(&infoData, 0, (int64_t*) &pBlock->info.uid);
|
||||||
|
SScalarParam srcParam = {
|
||||||
|
.numOfRows = pBlock->info.rows, .param = pMeta, .columnData = &infoData};
|
||||||
|
|
||||||
|
SScalarParam param = {.columnData = pColInfoData};
|
||||||
|
fpSet.process(&srcParam, 1, ¶m);
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 3
|
$pullDelay = 3
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
|
|
||||||
$cdb_index = 0
|
$cdb_index = 0
|
||||||
#=============================== start consume =============================#
|
#=============================== start consume =============================#
|
||||||
|
@ -74,7 +84,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -102,7 +112,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||||
|
@ -145,7 +155,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -173,7 +183,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -216,7 +226,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -244,7 +254,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 5
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,10 +54,19 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
$cdb_index = 0
|
$cdb_index = 0
|
||||||
|
|
||||||
#=============================== start consume =============================#
|
#=============================== start consume =============================#
|
||||||
|
|
||||||
print ================ test consume from stb
|
print ================ test consume from stb
|
||||||
|
@ -74,7 +84,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for stb
|
print == create consume info table and consume result table for stb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -102,10 +112,10 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||||
|
@ -177,7 +187,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ctb
|
print == create consume info table and consume result table for ctb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -205,9 +215,9 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -279,7 +289,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ntb
|
print == create consume info table and consume result table for ntb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -307,9 +317,9 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 3
|
$pullDelay = 3
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
|
|
||||||
$topicNum = 3
|
$topicNum = 3
|
||||||
|
|
||||||
|
@ -74,7 +84,7 @@ $consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||||
|
@ -109,7 +119,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -131,7 +141,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -166,7 +176,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -188,7 +198,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 5
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
$topicNum = 3
|
$topicNum = 3
|
||||||
|
|
||||||
|
@ -74,9 +83,9 @@ $consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||||
|
@ -139,7 +148,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ctb
|
print == create consume info table and consume result table for ctb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -161,9 +170,9 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -226,7 +235,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ntb
|
print == create consume info table and consume result table for ntb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -248,9 +257,9 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 5
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
$topicNum = 2
|
$topicNum = 2
|
||||||
|
|
||||||
|
@ -72,7 +81,7 @@ $consumerId = 0
|
||||||
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
|
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
|
||||||
$topicList = ' . topic_stb_all
|
$topicList = ' . topic_stb_all
|
||||||
|
@ -80,7 +89,7 @@ $topicList = $topicList . ,
|
||||||
$topicList = $topicList . topic_stb_function
|
$topicList = $topicList . topic_stb_function
|
||||||
$topicList = $topicList . '
|
$topicList = $topicList . '
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||||
|
@ -158,7 +167,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ctb
|
print == create consume info table and consume result table for ctb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -179,14 +188,14 @@ $consumerId = 0
|
||||||
$totalMsgOfOneTopic = $rowsPerCtb
|
$totalMsgOfOneTopic = $rowsPerCtb
|
||||||
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
$topicList = ' . topic_ctb_function
|
$topicList = ' . topic_ctb_function
|
||||||
$topicList = $topicList . ,
|
$topicList = $topicList . ,
|
||||||
$topicList = $topicList . topic_ctb_all
|
$topicList = $topicList . topic_ctb_all
|
||||||
$topicList = $topicList . '
|
$topicList = $topicList . '
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||||
|
@ -249,7 +258,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table for ntb
|
print == create consume info table and consume result table for ntb
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -270,7 +279,7 @@ $consumerId = 0
|
||||||
$totalMsgOfOneTopic = $rowsPerCtb
|
$totalMsgOfOneTopic = $rowsPerCtb
|
||||||
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
|
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
|
|
||||||
$topicList = ' . topic_ntb_function
|
$topicList = ' . topic_ntb_function
|
||||||
|
@ -278,7 +287,7 @@ $topicList = $topicList . ,
|
||||||
$topicList = $topicList . topic_ntb_all
|
$topicList = $topicList . topic_ntb_all
|
||||||
$topicList = $topicList . '
|
$topicList = $topicList . '
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 3
|
$pullDelay = 3
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
|
|
||||||
$cdb_index = 0
|
$cdb_index = 0
|
||||||
#=============================== start consume =============================#
|
#=============================== start consume =============================#
|
||||||
|
@ -74,7 +84,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -102,7 +112,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||||
|
@ -145,7 +155,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -173,7 +183,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -216,7 +226,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -244,7 +254,7 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 5
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
$cdb_index = 0
|
$cdb_index = 0
|
||||||
#=============================== start consume =============================#
|
#=============================== start consume =============================#
|
||||||
|
@ -74,7 +83,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -102,9 +111,9 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $cdbName -s start
|
||||||
|
@ -189,7 +198,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -217,9 +226,9 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb
|
$totalMsgOfCtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -291,7 +300,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -319,9 +328,9 @@ endi
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb
|
$totalMsgOfNtb = $rowsPerCtb
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 3
|
$pullDelay = 3
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,17 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
|
|
||||||
$topicNum = 3
|
$topicNum = 3
|
||||||
|
|
||||||
|
@ -71,7 +81,7 @@ $consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||||
|
@ -106,7 +116,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -128,7 +138,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -163,7 +173,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -185,7 +195,7 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -27,6 +27,7 @@ $tstart = 1640966400000 # 2022-01-01 00:00:00.000
|
||||||
|
|
||||||
$pullDelay = 5
|
$pullDelay = 5
|
||||||
$ifcheckdata = 1
|
$ifcheckdata = 1
|
||||||
|
$ifmanualcommit = 1
|
||||||
$showMsg = 1
|
$showMsg = 1
|
||||||
$showRow = 0
|
$showRow = 0
|
||||||
|
|
||||||
|
@ -53,8 +54,16 @@ sql create topic topic_ntb_function as select ts, abs(c1), sin(c2) from ntb0
|
||||||
# return -1
|
# return -1
|
||||||
#endi
|
#endi
|
||||||
|
|
||||||
|
#'group.id:cgrp1,enable.auto.commit:false,auto.commit.interval.ms:6000,auto.offset.reset:earliest'
|
||||||
$keyList = ' . group.id:cgrp1
|
$keyList = ' . group.id:cgrp1
|
||||||
|
$keyList = $keyList . ,
|
||||||
|
$keyList = $keyList . enable.auto.commit:false
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.commit.interval.ms:6000
|
||||||
|
#$keyList = $keyList . ,
|
||||||
|
#$keyList = $keyList . auto.offset.reset:earliest
|
||||||
$keyList = $keyList . '
|
$keyList = $keyList . '
|
||||||
|
print ========== key list: $keyList
|
||||||
|
|
||||||
$topicNum = 3
|
$topicNum = 3
|
||||||
|
|
||||||
|
@ -71,9 +80,9 @@ $consumerId = 0
|
||||||
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
$totalMsgOfStb = $ctbNum * $rowsPerCtb
|
||||||
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
$totalMsgOfStb = $totalMsgOfStb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfStb
|
$expectmsgcnt = $totalMsgOfStb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from stb
|
print == start consumer to pull msgs from stb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -w $dbName -s start
|
||||||
|
@ -148,7 +157,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -170,9 +179,9 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
$totalMsgOfCtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfCtb
|
$expectmsgcnt = $totalMsgOfCtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ctb
|
print == start consumer to pull msgs from ctb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
@ -236,7 +245,7 @@ sleep 500
|
||||||
sql use $cdbName
|
sql use $cdbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
@ -258,9 +267,9 @@ $topicList = $topicList . '
|
||||||
$consumerId = 0
|
$consumerId = 0
|
||||||
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
$totalMsgOfNtb = $rowsPerCtb * $topicNum
|
||||||
$expectmsgcnt = $totalMsgOfNtb
|
$expectmsgcnt = $totalMsgOfNtb
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
$consumerId = 1
|
$consumerId = 1
|
||||||
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata )
|
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
|
||||||
|
|
||||||
print == start consumer to pull msgs from ntb
|
print == start consumer to pull msgs from ntb
|
||||||
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
print == tsim/tmq/consume.sh -d $dbName -y $pullDelay -g $showMsg -r $showRow -s start
|
||||||
|
|
|
@ -48,7 +48,7 @@ endi
|
||||||
sql use $dbName
|
sql use $dbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
|
|
@ -48,7 +48,7 @@ endi
|
||||||
sql use $dbName
|
sql use $dbName
|
||||||
|
|
||||||
print == create consume info table and consume result table
|
print == create consume info table and consume result table
|
||||||
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)
|
sql create table consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)
|
||||||
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
sql create table consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)
|
||||||
|
|
||||||
sql show tables
|
sql show tables
|
||||||
|
|
|
@ -52,7 +52,7 @@ class TDTestCase:
|
||||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||||
pre_create = "create table"
|
pre_create = "create table"
|
||||||
sql = pre_create
|
sql = pre_create
|
||||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||||
|
@ -345,11 +345,11 @@ class TDTestCase:
|
||||||
after starting consumer, create ctables ")
|
after starting consumer, create ctables ")
|
||||||
# create and start thread
|
# create and start thread
|
||||||
parameterDict = {'cfg': '', \
|
parameterDict = {'cfg': '', \
|
||||||
'dbName': 'db2', \
|
'dbName': 'db3', \
|
||||||
'vgroups': 1, \
|
'vgroups': 1, \
|
||||||
'stbName': 'stb', \
|
'stbName': 'stb', \
|
||||||
'ctbNum': 10, \
|
'ctbNum': 10, \
|
||||||
'rowsPerTbl': 10000, \
|
'rowsPerTbl': 30000, \
|
||||||
'batchNum': 100, \
|
'batchNum': 100, \
|
||||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
parameterDict['cfg'] = cfgPath
|
parameterDict['cfg'] = cfgPath
|
||||||
|
@ -375,21 +375,32 @@ class TDTestCase:
|
||||||
else:
|
else:
|
||||||
time.sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
|
tdLog.info("create stable2 for the seconde topic")
|
||||||
|
parameterDict2 = {'cfg': '', \
|
||||||
|
'dbName': 'db3', \
|
||||||
|
'vgroups': 1, \
|
||||||
|
'stbName': 'stb2', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 30000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict2['cfg'] = cfgPath
|
||||||
|
tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName']))
|
||||||
|
|
||||||
tdLog.info("create topics from super table")
|
tdLog.info("create topics from super table")
|
||||||
topicFromStb = 'topic_stb_column2'
|
topicFromStb = 'topic_stb_column3'
|
||||||
topicFromCtb = 'topic_ctb_column2'
|
topicFromStb2 = 'topic_stb_column32'
|
||||||
|
|
||||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName']))
|
||||||
|
|
||||||
time.sleep(1)
|
|
||||||
tdSql.query("show topics")
|
tdSql.query("show topics")
|
||||||
topic1 = tdSql.getData(0 , 0)
|
topic1 = tdSql.getData(0 , 0)
|
||||||
topic2 = tdSql.getData(1 , 0)
|
topic2 = tdSql.getData(1 , 0)
|
||||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||||
if topic1 != topicFromStb and topic1 != topicFromCtb:
|
if topic1 != topicFromStb and topic1 != topicFromStb2:
|
||||||
tdLog.exit("topic error1")
|
tdLog.exit("topic error1")
|
||||||
if topic2 != topicFromStb and topic2 != topicFromCtb:
|
if topic2 != topicFromStb and topic2 != topicFromStb2:
|
||||||
tdLog.exit("topic error2")
|
tdLog.exit("topic error2")
|
||||||
|
|
||||||
tdLog.info("create consume info table and consume result table")
|
tdLog.info("create consume info table and consume result table")
|
||||||
|
@ -397,10 +408,9 @@ class TDTestCase:
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
|
||||||
rowsOfNewCtb = 1000
|
|
||||||
consumerId = 0
|
consumerId = 0
|
||||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||||
topicList = topicFromStb
|
topicList = topicFromStb + ',' + topicFromStb2
|
||||||
ifcheckdata = 0
|
ifcheckdata = 0
|
||||||
keyList = 'group.id:cgrp1,\
|
keyList = 'group.id:cgrp1,\
|
||||||
enable.auto.commit:false,\
|
enable.auto.commit:false,\
|
||||||
|
@ -432,17 +442,13 @@ class TDTestCase:
|
||||||
tdLog.info(shellCmd)
|
tdLog.info(shellCmd)
|
||||||
os.system(shellCmd)
|
os.system(shellCmd)
|
||||||
|
|
||||||
# create new child table and insert data
|
# start the second thread to create new child table and insert data
|
||||||
newCtbName = 'newctb'
|
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||||
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
|
prepareEnvThread2.start()
|
||||||
startTs = parameterDict["startTs"]
|
|
||||||
for j in range(rowsOfNewCtb):
|
|
||||||
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
|
|
||||||
tdSql.execute(sql)
|
|
||||||
tdLog.debug("insert data into new child table ............ [OK]")
|
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
prepareEnvThread2.join()
|
||||||
|
|
||||||
tdLog.info("insert process end, and start to check consume result")
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
while 1:
|
while 1:
|
||||||
|
@ -457,7 +463,7 @@ class TDTestCase:
|
||||||
tdSql.checkData(0 , 3, expectrowcnt)
|
tdSql.checkData(0 , 3, expectrowcnt)
|
||||||
|
|
||||||
tdSql.query("drop topic %s"%topicFromStb)
|
tdSql.query("drop topic %s"%topicFromStb)
|
||||||
tdSql.query("drop topic %s"%topicFromCtb)
|
tdSql.query("drop topic %s"%topicFromStb2)
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||||
|
|
||||||
|
@ -474,7 +480,7 @@ class TDTestCase:
|
||||||
|
|
||||||
self.tmqCase1(cfgPath, buildPath)
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
self.tmqCase2(cfgPath, buildPath)
|
self.tmqCase2(cfgPath, buildPath)
|
||||||
#self.tmqCase3(cfgPath, buildPath)
|
self.tmqCase3(cfgPath, buildPath)
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -37,9 +37,10 @@ typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
int32_t consumerId;
|
int32_t consumerId;
|
||||||
|
|
||||||
int32_t autoCommitIntervalMs; // 1000 ms
|
int32_t ifManualCommit;
|
||||||
char autoCommit[8]; // true, false
|
//int32_t autoCommitIntervalMs; // 1000 ms
|
||||||
char autoOffsetRest[16]; // none, earliest, latest
|
//char autoCommit[8]; // true, false
|
||||||
|
//char autoOffsetRest[16]; // none, earliest, latest
|
||||||
|
|
||||||
int32_t ifCheckData;
|
int32_t ifCheckData;
|
||||||
int64_t expectMsgCnt;
|
int64_t expectMsgCnt;
|
||||||
|
@ -136,9 +137,9 @@ void saveConfigToLogFile() {
|
||||||
|
|
||||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||||
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
||||||
taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
|
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
|
||||||
taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
|
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
|
||||||
taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
|
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
|
||||||
taosFprintfFile(g_fp, " Topics: ");
|
taosFprintfFile(g_fp, " Topics: ");
|
||||||
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
|
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
|
||||||
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
|
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
|
||||||
|
@ -232,13 +233,18 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
|
||||||
if (row == NULL) break;
|
if (row == NULL) break;
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||||
int32_t numOfFields = taos_field_count(msg);
|
int32_t numOfFields = taos_field_count(msg);
|
||||||
|
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
|
|
||||||
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
totalRows++;
|
totalRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,6 +322,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
|
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
|
||||||
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
||||||
|
|
||||||
|
taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||||
|
@ -384,7 +392,11 @@ void* consumeThreadFunc(void* param) {
|
||||||
|
|
||||||
loop_consume(pInfo);
|
loop_consume(pInfo);
|
||||||
|
|
||||||
|
if (pInfo->ifManualCommit) {
|
||||||
|
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
|
||||||
|
pPrint("tmq_commit() manual commit when consume end.\n");
|
||||||
tmq_commit(pInfo->tmq, NULL, 0);
|
tmq_commit(pInfo->tmq, NULL, 0);
|
||||||
|
}
|
||||||
|
|
||||||
err = tmq_unsubscribe(pInfo->tmq);
|
err = tmq_unsubscribe(pInfo->tmq);
|
||||||
if (err) {
|
if (err) {
|
||||||
|
@ -470,9 +482,9 @@ int32_t getConsumeInfo() {
|
||||||
int32_t* lengths = taos_fetch_lengths(pRes);
|
int32_t* lengths = taos_fetch_lengths(pRes);
|
||||||
|
|
||||||
// set default value
|
// set default value
|
||||||
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
|
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
|
||||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
|
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
|
||||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
|
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
|
||||||
|
|
||||||
for (int i = 0; i < num_fields; ++i) {
|
for (int i = 0; i < num_fields; ++i) {
|
||||||
if (row[i] == NULL || 0 == i) {
|
if (row[i] == NULL || 0 == i) {
|
||||||
|
@ -489,12 +501,8 @@ int32_t getConsumeInfo() {
|
||||||
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
|
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
|
||||||
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||||
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
|
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
|
||||||
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]);
|
g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
|
||||||
} else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
|
||||||
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
|
|
||||||
} else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
|
||||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
numOfThread++;
|
numOfThread++;
|
||||||
|
|
Loading…
Reference in New Issue