fix:filter data error & add test case

This commit is contained in:
wangmm0220 2024-04-12 09:49:08 +08:00
parent 035b61218c
commit ca8153fb98
6 changed files with 307 additions and 324 deletions

View File

@ -199,6 +199,7 @@ extern char tsSmlTsDefaultName[];
// extern int32_t tsSmlBatchSize; // extern int32_t tsSmlBatchSize;
extern int32_t tmqMaxTopicNum; extern int32_t tmqMaxTopicNum;
extern int32_t tmqRowSize;
// wal // wal
extern int64_t tsWalFsyncDataSizeLimit; extern int64_t tsWalFsyncDataSizeLimit;

View File

@ -602,7 +602,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
pVg->offsetInfo.committedOffset = *offsetVal; tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal);
end: end:
taosRUnLockLatch(&tmq->lock); taosRUnLockLatch(&tmq->lock);
@ -691,7 +691,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
tscInfo("consumer:0x%" PRIx64 tscInfo("consumer:0x%" PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
pVg->offsetInfo.committedOffset = pVg->offsetInfo.endOffset; tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset);
} else { } else {
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);

View File

@ -149,6 +149,7 @@ char tsCheckpointBackupDir[PATH_MAX] = "/var/lib/taos/backup/checkpoint/";
// tmq // tmq
int32_t tmqMaxTopicNum = 20; int32_t tmqMaxTopicNum = 20;
int32_t tmqRowSize = 4096;
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
int32_t tsQueryRspPolicy = 0; int32_t tsQueryRspPolicy = 0;
@ -720,6 +721,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) if (cfgAddInt32(pCfg, "tmqMaxTopicNum", tmqMaxTopicNum, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1; return -1;
if (cfgAddInt32(pCfg, "tmqRowSize", tmqRowSize, 1, 1000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0)
return -1;
if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) !=
0) 0)
return -1; return -1;
@ -1183,6 +1187,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32;
tmqRowSize = cfgGetItem(pCfg, "tmqRowSize")->i32;
tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32;
tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32; tsCompactPullupInterval = cfgGetItem(pCfg, "compactPullupInterval")->i32;
@ -1514,6 +1519,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) {
{"queryRspPolicy", &tsQueryRspPolicy}, {"queryRspPolicy", &tsQueryRspPolicy},
{"timeseriesThreshold", &tsTimeSeriesThreshold}, {"timeseriesThreshold", &tsTimeSeriesThreshold},
{"tmqMaxTopicNum", &tmqMaxTopicNum}, {"tmqMaxTopicNum", &tmqMaxTopicNum},
{"tmqRowSize", &tmqRowSize},
{"transPullupInterval", &tsTransPullupInterval}, {"transPullupInterval", &tsTransPullupInterval},
{"compactPullupInterval", &tsCompactPullupInterval}, {"compactPullupInterval", &tsCompactPullupInterval},
{"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec},

View File

@ -80,8 +80,6 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S
} }
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) {
const int32_t MAX_ROWS_TO_RETURN = 1;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t code = 0; int32_t code = 0;
int32_t totalRows = 0; int32_t totalRows = 0;
@ -153,7 +151,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
pRsp->blockNum++; pRsp->blockNum++;
totalRows += pDataBlock->info.rows; totalRows += pDataBlock->info.rows;
if (totalRows >= MAX_ROWS_TO_RETURN) { if (totalRows >= tmqRowSize) {
break; break;
} }
} }
@ -215,7 +213,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, STaosxRsp* pRsp, SMqMeta
continue; continue;
} else { } else {
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt <= 1) continue; if (rowCnt <= tmqRowSize) continue;
} }
} }

View File

@ -291,7 +291,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end; goto end;
} }
if (totalRows >= 4096 || (taosGetTimestampMs() - st > 1000)) { if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp( code = tqSendDataRsp(
pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp,

View File

@ -80,330 +80,308 @@ static void msg_process(TAOS_RES* msg) {
} }
int buildDatabase(TAOS* pConn, TAOS_RES* pRes) { int buildDatabase(TAOS* pConn, TAOS_RES* pRes) {
/* test for primary key start*/ /* test for TD-20612 start*/
pRes = taos_query(pConn, "create table if not exists pk (ts timestamp, c1 int primary key, c2 int)"); pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into pk values(1669092069069, 0, 1) (1669092069069, 1, 1)"); pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "insert into pk values(1669092069069, 2, 1) (1669092069069, 3, 1)"); pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create super table pk, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);
// /* test for TD-20612 start*/ /* test for TD-20612 end*/
// pRes = taos_query(pConn, "create table tb1 (ts timestamp, c1 int, c2 int)");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn,
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// return -1; "nchar(8), t4 bool)");
// } if (taos_errno(pRes) != 0) {
// taos_free_result(pRes); printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1;
// pRes = taos_query(pConn, "insert into tb1 (ts, c1) values(1669092069069, 0)"); }
// if (taos_errno(pRes) != 0) { taos_free_result(pRes);
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1; pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)");
// } if (taos_errno(pRes) != 0) {
// taos_free_result(pRes); printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes));
// return -1;
// pRes = taos_query(pConn, "insert into tb1 (ts, c2) values(1669092069069, 1)"); }
// if (taos_errno(pRes) != 0) { taos_free_result(pRes);
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// return -1; pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')");
// } if (taos_errno(pRes) != 0) {
// taos_free_result(pRes); printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes));
// return -1;
// /* test for TD-20612 end*/ }
// taos_free_result(pRes);
// pRes = taos_query(pConn,
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)");
// "nchar(8), t4 bool)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes));
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)");
// pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes));
// printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')");
// pRes = taos_query(pConn, "insert into ct0 values(1626006833400, 1, 2, 'a')"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes));
// printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)");
// pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes));
// printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(
// pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); pConn,
// if (taos_errno(pRes) != 0) { "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, "
// printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')");
// return -1; if (taos_errno(pRes) != 0) {
// } printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); return -1;
// }
// pRes = taos_query(pConn, "insert into ct1 values(1626006833600, 3, 4, 'b')"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) {
// printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
// return -1; if (taos_errno(pRes) != 0) {
// } printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); return -1;
// }
// pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
// return -1; if (taos_errno(pRes) != 0) {
// } printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); return -1;
// }
// pRes = taos_query( taos_free_result(pRes);
// pConn,
// "insert into ct3 values(1626006833600, 5, 6, 'c') ct1 values(1626006833601, 2, 3, 'sds') (1626006833602, 4, 5, " pRes = taos_query(pConn,
// "'ddd') ct0 values(1626006833603, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) "
// if (taos_errno(pRes) != 0) { "(1626006833609, 51, 62, 'c333', 940)");
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// pRes = taos_query(pConn, "alter table st1 add column c4 bigint");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn, "insert into ct3 select * from ct1");
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)");
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// pRes = taos_query(pConn,
// "insert into ct3 values(1626006833605, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) " pRes = taos_query(pConn, "alter table ct3 set tag t1=5000");
// "(1626006833609, 51, 62, 'c333', 940)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes));
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
// pRes = taos_query(pConn, "insert into ct3 select * from ct1"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes));
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); pRes = taos_query(pConn, "drop table ct3, ct1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); pRes = taos_query(pConn, "drop table st1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
// }
// pRes = taos_query(pConn, "delete from abc1 .ct3 where ts < 1626006833606");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))");
// printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "drop table ct3, ct1"); pRes = taos_query(pConn, "alter table n1 add column c3 bigint");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(pConn, "drop table st1"); pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
// }
// pRes = taos_query(pConn, "alter table n1 rename column c3 cc3");
// pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "alter table n1 comment 'hello'");
// pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "alter table n1 drop column c1");
// pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes));
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
// pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes));
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "alter table n1 comment 'hello'"); pRes = taos_query(pConn, "drop table n1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
// }
// pRes = taos_query(pConn, "alter table n1 drop column c1");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)");
// printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)");
// if (taos_errno(pRes) != 0) { pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')");
// printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); if (taos_errno(pRes) != 0) {
// return -1; printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes));
// } return -1;
// taos_free_result(pRes); }
// taos_free_result(pRes);
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "drop table n1"); pRes = taos_query(pConn, "create table jt2 using jt tags('')");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
// }
// pRes = taos_query(pConn, "insert into jt1 values(now, 1)");
// pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes));
// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// pRes = taos_query(pConn, "insert into jt2 values(now, 11)");
// pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); if (taos_errno(pRes) != 0) {
// if (taos_errno(pRes) != 0) { printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
// printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); return -1;
// return -1; }
// } taos_free_result(pRes);
// taos_free_result(pRes);
// if (g_conf.dropTable) {
// pRes = taos_query(pConn, "create table jt2 using jt tags('')"); pRes = taos_query(pConn,
// if (taos_errno(pRes) != 0) { "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); "nchar(8), t4 bool)");
// return -1; if (taos_errno(pRes) != 0) {
// } printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); return -1;
// }
// pRes = taos_query(pConn, "insert into jt1 values(now, 1)"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table jt1, reason:%s\n", taos_errstr(pRes)); pRes = taos_query(pConn, "drop table st1");
// return -1; if (taos_errno(pRes) != 0) {
// } printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes));
// taos_free_result(pRes); return -1;
// }
// pRes = taos_query(pConn, "insert into jt2 values(now, 11)"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) { }
// printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes));
// return -1; pRes = taos_query(pConn,
// } "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// taos_free_result(pRes); "nchar(8), t4 bool)");
// if (taos_errno(pRes) != 0) {
// if (g_conf.dropTable) { printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes));
// pRes = taos_query(pConn, return -1;
// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " }
// "nchar(8), t4 bool)"); taos_free_result(pRes);
// if (taos_errno(pRes) != 0) {
// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); pRes = taos_query(pConn,
// return -1; "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 "
// } "nchar(8), t4 bool)");
// taos_free_result(pRes); if (taos_errno(pRes) != 0) {
// printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes));
// pRes = taos_query(pConn, "drop table st1"); return -1;
// if (taos_errno(pRes) != 0) { }
// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes);
// return -1;
// } pRes = taos_query(
// taos_free_result(pRes); pConn,
// } "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) "
// "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)");
// pRes = taos_query(pConn, if (taos_errno(pRes) != 0) {
// "create stable if not exists stt (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// "nchar(8), t4 bool)"); return -1;
// if (taos_errno(pRes) != 0) { }
// printf("failed to create super table stt, reason:%s\n", taos_errstr(pRes)); taos_free_result(pRes);
// return -1;
// } pRes =
// taos_free_result(pRes); taos_query(pConn,
// "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + "
// pRes = taos_query(pConn, "1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
// "create stable if not exists sttb (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb "
// "nchar(8), t4 bool)"); "tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("failed to create super table sttb, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
//
// pRes = taos_query(
// pConn,
// "create table if not exists stt1 using stt tags(2, \"stt1\", true) sttb1 using sttb tags(4, \"sttb1\", true) "
// "stt2 using stt tags(43, \"stt2\", false) sttb2 using sttb tags(54, \"sttb2\", true)");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
//
// pRes =
// taos_query(pConn,
// "insert into stt1 values(now + 2s, 3, 2, 'stt1') stt3 using stt tags(23, \"stt3\", true) values(now + "
// "1s, 1, 2, 'stt3') sttb3 using sttb tags(4, \"sttb3\", true) values(now + 2s, 13, 22, 'sttb3') "
// "stt4 using stt tags(433, \"stt4\", false) values(now + 3s, 21, 21, 'stt4') sttb4 using sttb "
// "tags(543, \"sttb4\", true) values(now + 4s, 16, 25, 'sttb4')");
// if (taos_errno(pRes) != 0) {
// printf("failed to create child table stt1, reason:%s\n", taos_errstr(pRes));
// return -1;
// }
// taos_free_result(pRes);
return 0; return 0;
} }
@ -512,12 +490,12 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
// pRes = taos_query(pConn, "drop database if exists abc1"); pRes = taos_query(pConn, "drop database if exists abc1");
// if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
// printf("error in drop db, reason:%s\n", taos_errstr(pRes)); printf("error in drop db, reason:%s\n", taos_errstr(pRes));
// return -1; return -1;
// } }
// taos_free_result(pRes); taos_free_result(pRes);
snprintf(sql, 128, "create database if not exists abc1 vgroups %d wal_retention_period 3600", g_conf.srcVgroups); snprintf(sql, 128, "create database if not exists abc1 vgroups %d wal_retention_period 3600", g_conf.srcVgroups);
pRes = taos_query(pConn, sql); pRes = taos_query(pConn, sql);
@ -1056,7 +1034,7 @@ int main(int argc, char* argv[]) {
basic_consume_loop(tmq, topic_list); basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list); tmq_list_destroy(topic_list);
// testConsumeExcluded(1); testConsumeExcluded(1);
// testConsumeExcluded(2); testConsumeExcluded(2);
taosCloseFile(&g_fp); taosCloseFile(&g_fp);
} }