fix(tmq): fix error.
This commit is contained in:
parent
5f4cb41e28
commit
e1d85ce074
|
@ -1145,13 +1145,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
pInfo->pTableScanOp->resultInfo.totalRows = 0;
|
||||||
|
|
||||||
// start from current accessed position
|
// start from current accessed position
|
||||||
index = tableListFind(pTableListInfo, uid, pScanInfo->currentTable);
|
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start
|
||||||
|
// position, let's find it from the beginning.
|
||||||
|
index = tableListFind(pTableListInfo, uid, 0);
|
||||||
taosRUnLockLatch(&pTaskInfo->lock);
|
taosRUnLockLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
if (index >= 0) {
|
if (index >= 0) {
|
||||||
pScanInfo->currentTable = index;
|
pScanInfo->currentTable = index;
|
||||||
} else {
|
} else {
|
||||||
qError("uid:%" PRIu64 " not found in table list, total:%d %s", uid, numOfTables, id);
|
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
|
||||||
|
numOfTables, pScanInfo->currentTable, id);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1160,25 +1163,24 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
|
||||||
|
|
||||||
// let's start from the next ts that returned to consumer.
|
// let's start from the next ts that returned to consumer.
|
||||||
pScanBaseInfo->cond.twindows.skey = ts + 1;
|
pScanBaseInfo->cond.twindows.skey = ts + 1;
|
||||||
|
pScanInfo->scanTimes = 0;
|
||||||
|
|
||||||
if (pScanBaseInfo->dataReader == NULL) {
|
if (pScanBaseInfo->dataReader == NULL) {
|
||||||
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
|
int32_t code = tsdbReaderOpen(pScanBaseInfo->readHandle.vnode, &pScanBaseInfo->cond, &keyInfo, 1,
|
||||||
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, NULL);
|
pScanInfo->pResBlock, &pScanBaseInfo->dataReader, id);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
|
qError("prepare read tsdb snapshot failed, uid:%" PRId64 ", code:%s %s", pOffset->uid, tstrerror(code), id);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts %" PRId64 " table index:%d, total:%d, %s",
|
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
|
||||||
uid, ts, pScanInfo->currentTable, numOfTables, id);
|
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
|
||||||
} else {
|
} else {
|
||||||
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
tsdbSetTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
|
||||||
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
tsdbReaderReset(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
|
||||||
pScanInfo->scanTimes = 0;
|
|
||||||
|
|
||||||
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
|
||||||
uid, ts, pScanInfo->currentTable, numOfTables, id);
|
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
// restore the key value
|
// restore the key value
|
||||||
|
|
|
@ -779,13 +779,14 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
// if no data, switch to next table and continue scan
|
// if no data, switch to next table and continue scan
|
||||||
pInfo->currentTable++;
|
pInfo->currentTable++;
|
||||||
if (pInfo->currentTable >= numOfTables) {
|
if (pInfo->currentTable >= numOfTables) {
|
||||||
|
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
|
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
|
||||||
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
|
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
|
||||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
|
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
|
||||||
pInfo->currentTable, numOfTables, pTaskInfo->id.str);
|
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
|
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
|
||||||
pInfo->scanTimes = 0;
|
pInfo->scanTimes = 0;
|
||||||
|
|
|
@ -13,11 +13,11 @@ from util.dnodes import *
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
hostname = socket.gethostname()
|
hostname = socket.gethostname()
|
||||||
#rpcDebugFlagVal = '143'
|
rpcDebugFlagVal = '143'
|
||||||
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||||
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||||
#print ("===================: ", updatecfgDict)
|
#print ("===================: ", updatecfgDict)
|
||||||
|
|
||||||
def init(self, conn, logSql, replicaVar=1):
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
|
|
@ -16,6 +16,8 @@ sys.path.append("./7-tmq")
|
||||||
from tmqCommon import *
|
from tmqCommon import *
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
updatecfgDict = {"tsdbDebugFlag":135}
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.vgroups = 4
|
self.vgroups = 4
|
||||||
self.ctbNum = 10
|
self.ctbNum = 10
|
||||||
|
|
Loading…
Reference in New Issue