Merge pull request #25102 from taosdata/fix/TS-4563

fix:[TS-4563]retrieve data error from wal if colid is bigger in tmq
This commit is contained in:
dapan1121 2024-03-17 16:52:29 +08:00 committed by GitHub
commit 062bdf686e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 161 additions and 12 deletions

View File

@ -666,20 +666,19 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
int32_t targetIdx = 0; int32_t targetIdx = 0;
int32_t sourceIdx = 0; int32_t sourceIdx = 0;
while (targetIdx < colActual) { while (targetIdx < colActual) {
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
if (sourceIdx >= numOfCols) { if (sourceIdx >= numOfCols) {
tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); tqError("lostdata tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols);
return -1; colDataSetNNULL(pColData, 0, numOfRows);
targetIdx++;
continue;
} }
SColData* pCol = taosArrayGet(pCols, sourceIdx); SColData* pCol = taosArrayGet(pCols, sourceIdx);
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, targetIdx);
SColVal colVal; SColVal colVal;
if (pCol->nVal != numOfRows) { tqTrace("lostdata colActual:%d, sourceIdx:%d, targetIdx:%d, numOfCols:%d, source cid:%d, dst cid:%d", colActual, sourceIdx, targetIdx, numOfCols, pCol->cid, pColData->info.colId);
tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows);
return -1;
}
if (pCol->cid < pColData->info.colId) { if (pCol->cid < pColData->info.colId) {
sourceIdx++; sourceIdx++;
} else if (pCol->cid == pColData->info.colId) { } else if (pCol->cid == pColData->info.colId) {
@ -693,7 +692,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
sourceIdx++; sourceIdx++;
targetIdx++; targetIdx++;
} else { } else {
colDataSetNNULL(pColData, 0, pCol->nVal); colDataSetNNULL(pColData, 0, numOfRows);
targetIdx++; targetIdx++;
} }
} }
@ -712,9 +711,6 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
SColVal colVal; SColVal colVal;
tRowGet(pRow, pTSchema, sourceIdx, &colVal); tRowGet(pRow, pTSchema, sourceIdx, &colVal);
if (colVal.cid < pColData->info.colId) { if (colVal.cid < pColData->info.colId) {
// tqDebug("colIndex:%d column id:%d in row, ignore, the required colId:%d, total cols in
// schema:%d",
// sourceIdx, colVal.cid, pColData->info.colId, pTSchema->numOfCols);
sourceIdx++; sourceIdx++;
continue; continue;
} else if (colVal.cid == pColData->info.colId) { } else if (colVal.cid == pColData->info.colId) {

View File

@ -2084,6 +2084,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
blockDataCleanup(pInfo->pRes); blockDataCleanup(pInfo->pRes);
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
setBlockIntoRes(pInfo, pRes, &defaultWindow, true); setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
qDebug("doQueueScan after filter get data from log %" PRId64 " rows, version:%" PRId64, pInfo->pRes->info.rows,
pTaskInfo->streamInfo.currentOffset.version);
if (pInfo->pRes->info.rows > 0) { if (pInfo->pRes->info.rows > 0) {
return pInfo->pRes; return pInfo->pRes;
} }

View File

@ -229,6 +229,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py

View File

@ -16,6 +16,7 @@ sys.path.append("./7-tmq")
from tmqCommon import * from tmqCommon import *
class TDTestCase: class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1): def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar) self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}") tdLog.debug(f"start to excute {__file__}")

View File

@ -0,0 +1,149 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
from taos import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 143, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def consumeTest_TS_4563(self):
tdSql.execute(f'use db_stmt')
tdSql.query("select ts,k from st")
tdSql.checkRows(2)
tdSql.execute(f'create topic t_unorder_data as select ts,k from st')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["t_unorder_data"])
except TmqError:
tdLog.exit(f"subscribe error")
cnt = 0
try:
while True:
res = consumer.poll(1)
print(res)
if not res:
if cnt == 0:
tdLog.exit("consume error")
break
val = res.value()
if val is None:
continue
for block in val:
cnt += len(block.fetchall())
if cnt != 2:
tdLog.exit("consume error")
finally:
consumer.close()
def getBuildPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
if ("community" in selfPath):
projPath = selfPath[:selfPath.find("community")]
else:
projPath = selfPath[:selfPath.find("tests")]
for root, dirs, files in os.walk(projPath):
if ("taosd" in files or "taosd.exe" in files):
rootRealPath = os.path.dirname(os.path.realpath(root))
if ("packaging" not in rootRealPath):
buildPath = root[:len(root)-len("/build/bin")]
break
return buildPath
def newcon(self,host,cfg):
user = "root"
password = "taosdata"
port =6030
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
print(con)
return con
def test_stmt_insert_multi(self,conn):
# type: (TaosConnection) -> None
dbname = "db_stmt"
try:
conn.execute("drop database if exists %s" % dbname)
conn.execute("create database if not exists %s" % dbname)
conn.select_db(dbname)
conn.execute(
"create table st(ts timestamp, i int, j int, k int)",
)
# conn.load_table_info("log")
tdLog.debug("statement start")
start = datetime.now()
stmt = conn.statement("insert into st(ts,j) values(?, ?)")
params = new_multi_binds(2)
params[0].timestamp((1626861392589, 1626861392590))
params[1].int([3, None])
# print(type(stmt))
tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end")
stmt.execute()
tdLog.debug("execute end")
end = datetime.now()
print("elapsed time: ", end - start)
assert stmt.affected_rows == 2
tdLog.debug("close start")
stmt.close()
# conn.execute("drop database if exists %s" % dbname)
conn.close()
except Exception as err:
# conn.execute("drop database if exists %s" % dbname)
conn.close()
raise err
def run(self):
buildPath = self.getBuildPath()
config = buildPath+ "../sim/dnode1/cfg/"
host="localhost"
connectstmt=self.newcon(host,config)
self.test_stmt_insert_multi(connectstmt)
self.consumeTest_TS_4563()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())