feat:[TD-26056] add replay logic

This commit is contained in:
wangmm0220 2023-10-09 17:35:40 +08:00
parent 5f7b6f19ba
commit 33045e63ae
10 changed files with 418 additions and 35 deletions

View File

@ -724,6 +724,17 @@ void tmqAssignAskEpTask(void* param, void* tmrId) {
taosMemoryFree(param);
}
void tmqReplayTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if(tmq == NULL) goto END;
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
END:
taosMemoryFree(param);
}
void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
int64_t refId = *(int64_t*)param;
generateTimedTask(refId, TMQ_DELAYED_TASK__COMMIT);
@ -1144,6 +1155,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
req.autoCommit = tmq->autoCommit;
req.autoCommitInterval = tmq->autoCommitInterval;
req.resetOffsetCfg = tmq->resetOffsetCfg;
req.enableReplay = tmq->replayEnable;
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i);
@ -1823,6 +1835,11 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if(tmq->replayEnable){
pVg->blockReceiveTs = taosGetTimestampMs();
pVg->blockSleepForReplay = pRsp->rsp.sleepTime;
if(pVg->blockSleepForReplay > 0){
int64_t* pRefId1 = taosMemoryMalloc(sizeof(int64_t));
*pRefId1 = tmq->refId;
taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, pRefId1, tmqMgmt.timer);
}
}
tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,

View File

@ -112,8 +112,9 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
}
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pHandle->block = createDataBlock();
copyDataBlock(pHandle->block, pDataBlock);
pHandle->block = createOneDataBlock(pDataBlock, true);
// pHandle->block = createDataBlock();
// copyDataBlock(pHandle->block, pDataBlock);
pHandle->blockTime = offset.ts;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
if (code != 0){
@ -129,14 +130,16 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
pRsp->blockNum++;
if (pDataBlock == NULL) {
break;
}
copyDataBlock(pHandle->block, pDataBlock);
blockDataDestroy(pHandle->block);
pHandle->block = NULL;
}else{
copyDataBlock(pHandle->block, pDataBlock);
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
STqOffsetVal offset = {0};
qStreamExtractOffset(task, &offset);
pRsp->sleepTime = offset.ts - pHandle->blockTime;
pHandle->blockTime = offset.ts;
}
break;
}else{
if (pDataBlock == NULL) {

View File

@ -40,11 +40,11 @@ void tqUpdateNodeStage(STQ* pTq) {
tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage);
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset, bool withTbName) {
pRsp->reqOffset = pOffset;
pRsp->rspOffset = pOffset;
pRsp->withTbName = 1;
pRsp->withTbName = withTbName;
pRsp->withSchema = 1;
pRsp->blockData = taosArrayInit(0, sizeof(void*));
pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t));
@ -177,7 +177,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
int32_t vgId = TD_VID(pTq->pVnode);
SMqMetaRsp metaRsp = {0};
STaosxRsp taosxRsp = {0};
tqInitTaosxRsp(&taosxRsp, *offset);
tqInitTaosxRsp(&taosxRsp, *offset, pRequest->withTbName);
if (offset->type != TMQ_OFFSET__LOG) {
if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) {

View File

@ -330,7 +330,6 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break;
}
qDebug("project return %d", pProjectInfo->mergeDataBlocks);
if (pProjectInfo->mergeDataBlocks) {
if (pRes->info.rows > 0) {
pFinalRes->info.id.groupId = 0; // clear groupId

View File

@ -159,6 +159,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py

View File

@ -110,7 +110,7 @@ class TDTestCase:
tdLog.info(shellCmd)
os.system(shellCmd)
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
def create_database(self,tsql, dbName,dropFlag=1,vgroups=1,replica=1):
if dropFlag == 1:
tsql.execute("drop database if exists %s"%(dbName))
@ -149,21 +149,12 @@ class TDTestCase:
t = time.time()
startTs = int(round(t * 1000))
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
rowsOfSql = 0
for i in range(ctbNum):
sql += " %s_%d values "%(stbName,i)
for j in range(rowsPerTbl):
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
rowsOfSql += 1
if ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
tsql.execute(sql)
time.sleep(1)
rowsOfSql = 0
if j < rowsPerTbl - 1:
sql = "insert into %s_%d values " %(stbName,i)
else:
sql = "insert into "
for j in range(rowsPerTbl):
for i in range(ctbNum):
sql += " %s_%d values (%d, %d, 'tmqrow_%d') "%(stbName, i, startTs + j + i, j+i, j+i)
tsql.execute(sql)
time.sleep(1)
sql = "insert into "
#end sql
if sql != pre_insert:
#print("insert sql:%s"%sql)
@ -199,10 +190,10 @@ class TDTestCase:
'actionType': 0, \
'dbName': 'db8', \
'dropFlag': 1, \
'vgroups': 4, \
'vgroups': 1, \
'replica': 1, \
'stbName': 'stb1', \
'ctbNum': 1, \
'ctbNum': 2, \
'rowsPerTbl': 10, \
'batchNum': 1, \
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
@ -223,7 +214,7 @@ class TDTestCase:
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
consumerId = 0
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] * 2
topicList = topicFromStb1
ifcheckdata = 0
ifManualCommit = 1
@ -247,8 +238,8 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows != 0:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0))
if totalConsumeRows != expectrowcnt:
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")
# tdLog.info("start consume 1 processor")

View File

@ -0,0 +1,39 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
def run(self):
tdSql.prepare()
buildPath = tdCom.getBuildPath()
cmdStr1 = '%s/build/bin/replay_test'%(buildPath)
tdLog.info(cmdStr1)
result = os.system(cmdStr1)
if result != 0:
tdLog.exit("tmq_replay error!")
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -9,6 +9,7 @@ add_executable(get_db_name_test get_db_name_test.c)
add_executable(tmq_offset tmqOffset.c)
add_executable(tmq_offset_test tmq_offset_test.c)
add_executable(varbinary_test varbinary_test.c)
add_executable(replay_test replay_test.c)
if(${TD_LINUX})
add_executable(tsz_test tsz_test.c)
@ -57,6 +58,14 @@ target_link_libraries(
PUBLIC os
)
target_link_libraries(
replay_test
PUBLIC taos
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
write_raw_block_test
PUBLIC taos

323
utils/test/c/replay_test.c Normal file
View File

@ -0,0 +1,323 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <time.h>
#include "taos.h"
#include "types.h"
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "g1");
tmq_conf_set(conf, "client.id", "c1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "enable.replay", "true");
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
void test_vgroup_error(TAOS* pConn){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists d1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as select * from d1.s1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "t1");
tmq_t* tmq = build_consumer();
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
tmq_list_destroy(topic_list);
tmq_consumer_close(tmq);
}
void test_stable_db_error(TAOS* pConn){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists d1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as stable d1.s1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "t1");
tmq_t* tmq = build_consumer();
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
tmq_list_destroy(topic_list);
tmq_consumer_close(tmq);
pRes = taos_query(pConn, "drop topic if exists t1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create topic t1 as database d1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
topic_list = tmq_list_new();
tmq_list_append(topic_list, "t1");
tmq = build_consumer();
ASSERT(tmq_subscribe(tmq, topic_list) != 0);
tmq_list_destroy(topic_list);
tmq_consumer_close(tmq);
}
void insert_with_sleep(TAOS* pConn, int32_t* interval, int32_t len){
for(int i = 0; i < len; i++){
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
taosMsleep(interval[i]);
}
}
void insert_with_sleep_multi(TAOS* pConn, int32_t* interval, int32_t len){
for(int i = 0; i < len; i++){
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 1) (now+1s, 2) d1.table2 (ts, c1) values (now, 1) (now+1s, 2)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
taosMsleep(interval[i]);
}
}
void test_case1(TAOS* pConn, int32_t* interval, int32_t len){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists d1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists d1 vgroups 2 wal_retention_period 3600");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
insert_with_sleep(pConn, interval, len);
pRes = taos_query(pConn, "create topic t1 as select * from d1.table1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "t1");
tmq_t* tmq = build_consumer();
// 启动订阅
tmq_subscribe(tmq, topic_list);
tmq_list_destroy(topic_list);
int32_t timeout = 5000;
int64_t t = 0;
int32_t totalRows = 0;
char buf[1024] = {0};
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout);
if (tmqmessage) {
if(t != 0){
ASSERT(taosGetTimestampMs() - t >= interval[totalRows - 1]);
}
t = taosGetTimestampMs();
TAOS_ROW row = taos_fetch_row(tmqmessage);
if (row == NULL) {
break;
}
TAOS_FIELD* fields = taos_fetch_fields(tmqmessage);
int32_t numOfFields = taos_field_count(tmqmessage);
const char* tbName = tmq_get_table_name(tmqmessage);
taos_print_row(buf, row, fields, numOfFields);
printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf);
totalRows++;
taos_free_result(tmqmessage);
} else {
break;
}
}
ASSERT(totalRows == len);
tmq_consumer_close(tmq);
}
void test_case2(TAOS* pConn, int32_t* interval, int32_t len, tsem_t* sem){
TAOS_RES* pRes = taos_query(pConn, "drop topic if exists t1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "drop database if exists d1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create database if not exists d1 vgroups 1 wal_retention_period 3600");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "CREATE STABLE d1.s1 (ts TIMESTAMP, c1 INT) TAGS (t1 INT)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1.table1 using d1.s1 tags(1)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
pRes = taos_query(pConn, "create table d1.table2 using d1.s1 tags(2)");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
insert_with_sleep_multi(pConn, interval, len);
pRes = taos_query(pConn, "create topic t1 as select * from d1.s1");
ASSERT(taos_errno(pRes) == 0);
taos_free_result(pRes);
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "t1");
tmq_t* tmq = build_consumer();
// 启动订阅
tmq_subscribe(tmq, topic_list);
tmq_list_destroy(topic_list);
int32_t timeout = 5000;
int64_t t = 0;
int32_t totalRows = 0;
char buf[1024] = {0};
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, timeout);
if (tmqmessage) {
if(t != 0 && totalRows % 4 == 0){
ASSERT(taosGetTimestampMs() - t >= interval[totalRows/4 - 1]);
}
t = taosGetTimestampMs();
while(1){
TAOS_ROW row = taos_fetch_row(tmqmessage);
if (row == NULL) {
break;
}
TAOS_FIELD* fields = taos_fetch_fields(tmqmessage);
int32_t numOfFields = taos_field_count(tmqmessage);
const char* tbName = tmq_get_table_name(tmqmessage);
taos_print_row(buf, row, fields, numOfFields);
printf("%lld tbname:%s, rows[%d]: %s\n", t, (tbName != NULL ? tbName : "null table"), totalRows, buf);
totalRows++;
}
taos_free_result(tmqmessage);
if(totalRows == len * 4){
taosSsleep(1);
tsem_post(sem);
}
} else {
break;
}
}
ASSERT(totalRows == len * 4 + 1);
tmq_consumer_close(tmq);
}
void* insertThreadFunc(void* param) {
tsem_t* sem = (tsem_t*)param;
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
tsem_wait(sem);
TAOS_RES* pRes = taos_query(pConn, "insert into d1.table1 (ts, c1) values (now, 11)");
ASSERT(taos_errno(pRes) == 0);
printf("insert data again\n");
taos_free_result(pRes);
taos_close(pConn);
return NULL;
}
int main(int argc, char* argv[]) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
test_vgroup_error(pConn);
test_stable_db_error(pConn);
tsem_t sem;
tsem_init(&sem, 0, 0);
TdThread thread;
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
// pthread_create one thread to consume
taosThreadCreate(&thread, &thattr, insertThreadFunc, (void*)(&sem));
int32_t interval[5] = {1000, 200, 3000, 40, 500};
test_case1(pConn, interval, sizeof(interval)/sizeof(int32_t));
printf("test_case1 success\n");
test_case2(pConn, interval, sizeof(interval)/sizeof(int32_t), &sem);
taos_close(pConn);
taosThreadJoin(thread, NULL);
taosThreadClear(&thread);
tsem_destroy(&sem);
return 0;
}

View File

@ -621,10 +621,11 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
taos_print_row(buf, row, fields, numOfFields);
if (0 != g_stConfInfo.showRowFlag) {
taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf);
taosFprintfFile(g_fp, "%lld tbname:%s, rows[%d]: %s\n", taosGetTimestampMs(), (tbName != NULL ? tbName : "null table"), totalRows, buf);
// if (0 != g_stConfInfo.saveRowFlag) {
// saveConsumeContentToTbl(pInfo, buf);
// }
// taosFsyncFile(g_fp);
}
totalRows++;