Merge branch 'feature/3_liaohj' of github.com:taosdata/tdengine into feature/3_liaohj
This commit is contained in:
commit
ffcbc8cfc0
|
@ -308,7 +308,12 @@ int32_t tqNextBlock(STqReader* pReader, SFetchRet* ret) {
|
||||||
while (1) {
|
while (1) {
|
||||||
if (!fromProcessedMsg) {
|
if (!fromProcessedMsg) {
|
||||||
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
if (walNextValidMsg(pReader->pWalReader) < 0) {
|
||||||
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
|
// pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
|
||||||
|
if(pReader->pWalReader->curInvalid == 0){
|
||||||
|
pReader->ver = pReader->pWalReader->curVersion - pReader->pWalReader->curStopped;
|
||||||
|
}else{
|
||||||
|
pReader->ver = walGetLastVer(pReader->pWalReader->pWal);
|
||||||
|
}
|
||||||
ret->offset.type = TMQ_OFFSET__LOG;
|
ret->offset.type = TMQ_OFFSET__LOG;
|
||||||
|
|
||||||
ret->offset.version = pReader->ver;
|
ret->offset.version = pReader->ver;
|
||||||
|
|
|
@ -1609,6 +1609,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
if (pResult && pResult->info.rows > 0) {
|
if (pResult && pResult->info.rows > 0) {
|
||||||
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
|
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64, pResult->info.rows,
|
||||||
pResult->info.window.skey, pResult->info.window.ekey);
|
pResult->info.window.skey, pResult->info.window.ekey);
|
||||||
|
qDebug("queue scan tsdb return %d rows min:%" PRId64 " max:%" PRId64 " wal curVersion:%" PRId64, pResult->info.rows,
|
||||||
|
pResult->info.window.skey, pResult->info.window.ekey, pInfo->tqReader->pWalReader->curVersion);
|
||||||
pTaskInfo->streamInfo.returned = 1;
|
pTaskInfo->streamInfo.returned = 1;
|
||||||
return pResult;
|
return pResult;
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue