Merge pull request #3931 from taosdata/bugfix/td-1471
[TD-1471]<fix>: empty result was returned if a database was dropped during query
This commit is contained in:
commit
55385f56d7
|
@ -192,8 +192,10 @@ class TDengineCursor(object):
|
||||||
buffer = [[] for i in range(len(self._fields))]
|
buffer = [[] for i in range(len(self._fields))]
|
||||||
self._rowcount = 0
|
self._rowcount = 0
|
||||||
while True:
|
while True:
|
||||||
block, num_of_fields = CTaosInterface.fetchBlock(
|
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
||||||
self._result, self._fields)
|
errno = CTaosInterface.libtaos.taos_errno(self._result)
|
||||||
|
if errno != 0:
|
||||||
|
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||||
if num_of_fields == 0:
|
if num_of_fields == 0:
|
||||||
break
|
break
|
||||||
self._rowcount += num_of_fields
|
self._rowcount += num_of_fields
|
||||||
|
|
|
@ -207,8 +207,10 @@ class TDengineCursor(object):
|
||||||
buffer = [[] for i in range(len(self._fields))]
|
buffer = [[] for i in range(len(self._fields))]
|
||||||
self._rowcount = 0
|
self._rowcount = 0
|
||||||
while True:
|
while True:
|
||||||
block, num_of_fields = CTaosInterface.fetchBlock(
|
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
||||||
self._result, self._fields)
|
errno = CTaosInterface.libtaos.taos_errno(self._result)
|
||||||
|
if errno != 0:
|
||||||
|
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||||
if num_of_fields == 0:
|
if num_of_fields == 0:
|
||||||
break
|
break
|
||||||
self._rowcount += num_of_fields
|
self._rowcount += num_of_fields
|
||||||
|
|
|
@ -142,6 +142,9 @@ class TDengineCursor(object):
|
||||||
self._rowcount = 0
|
self._rowcount = 0
|
||||||
while True:
|
while True:
|
||||||
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
||||||
|
errno = CTaosInterface.libtaos.taos_errno(self._result)
|
||||||
|
if errno != 0:
|
||||||
|
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||||
if num_of_fields == 0: break
|
if num_of_fields == 0: break
|
||||||
self._rowcount += num_of_fields
|
self._rowcount += num_of_fields
|
||||||
for i in range(len(self._fields)):
|
for i in range(len(self._fields)):
|
||||||
|
|
|
@ -142,6 +142,9 @@ class TDengineCursor(object):
|
||||||
self._rowcount = 0
|
self._rowcount = 0
|
||||||
while True:
|
while True:
|
||||||
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
block, num_of_fields = CTaosInterface.fetchBlock(self._result, self._fields)
|
||||||
|
errno = CTaosInterface.libtaos.taos_errno(self._result)
|
||||||
|
if errno != 0:
|
||||||
|
raise ProgrammingError(CTaosInterface.errStr(self._result), errno)
|
||||||
if num_of_fields == 0: break
|
if num_of_fields == 0: break
|
||||||
self._rowcount += num_of_fields
|
self._rowcount += num_of_fields
|
||||||
for i in range(len(self._fields)):
|
for i in range(len(self._fields)):
|
||||||
|
|
|
@ -7101,6 +7101,7 @@ void qCleanupQueryMgmt(void* pQMgmt) {
|
||||||
|
|
||||||
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
if (pMgmt == NULL) {
|
if (pMgmt == NULL) {
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7109,6 +7110,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
if (pQueryMgmt->qinfoPool == NULL) {
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
qError("QInfo:%p failed to add qhandle into qMgmt, since qMgmt is closed", (void *)qInfo);
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -7116,6 +7118,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
if (pQueryMgmt->closed) {
|
if (pQueryMgmt->closed) {
|
||||||
// pthread_mutex_unlock(&pQueryMgmt->lock);
|
// pthread_mutex_unlock(&pQueryMgmt->lock);
|
||||||
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
|
qError("QInfo:%p failed to add qhandle into cache, since qMgmt is colsing", (void *)qInfo);
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
return NULL;
|
return NULL;
|
||||||
} else {
|
} else {
|
||||||
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
TSDB_CACHE_PTR_TYPE handleVal = (TSDB_CACHE_PTR_TYPE) qInfo;
|
||||||
|
|
|
@ -191,10 +191,12 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
|
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t)pQInfo);
|
||||||
if (handle == NULL) { // failed to register qhandle, todo add error test case
|
if (handle == NULL) { // failed to register qhandle, todo add error test case
|
||||||
|
pRsp->code = terrno;
|
||||||
|
terrno = 0;
|
||||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||||
tstrerror(pRsp->code));
|
tstrerror(pRsp->code));
|
||||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
|
||||||
qDestroyQueryInfo(pQInfo); // destroy it directly
|
qDestroyQueryInfo(pQInfo); // destroy it directly
|
||||||
|
return pRsp->code;
|
||||||
} else {
|
} else {
|
||||||
assert(*handle == pQInfo);
|
assert(*handle == pQInfo);
|
||||||
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
pRsp->qhandle = htobe64((uint64_t)pQInfo);
|
||||||
|
|
|
@ -149,6 +149,7 @@ python3 ./test.py -f query/queryNullValueTest.py
|
||||||
python3 ./test.py -f query/queryInsertValue.py
|
python3 ./test.py -f query/queryInsertValue.py
|
||||||
python3 ./test.py -f query/queryConnection.py
|
python3 ./test.py -f query/queryConnection.py
|
||||||
python3 ./test.py -f query/natualInterval.py
|
python3 ./test.py -f query/natualInterval.py
|
||||||
|
python3 ./test.py -f query/bug1471.py
|
||||||
|
|
||||||
#stream
|
#stream
|
||||||
python3 ./test.py -f stream/metric_1.py
|
python3 ./test.py -f stream/metric_1.py
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
###################################################################
|
||||||
|
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||||
|
# All rights reserved.
|
||||||
|
#
|
||||||
|
# This file is proprietary and confidential to TAOS Technologies.
|
||||||
|
# No part of this file may be reproduced, stored, transmitted,
|
||||||
|
# disclosed or used in any form or by any means other than as
|
||||||
|
# expressly provided by the written permission from Jianhui Tao
|
||||||
|
#
|
||||||
|
###################################################################
|
||||||
|
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
import sys
|
||||||
|
import taos
|
||||||
|
from util.log import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.sql import *
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
|
class myThread(threading.Thread):
|
||||||
|
def __init__(self, conn):
|
||||||
|
threading.Thread.__init__(self)
|
||||||
|
self.event = threading.Event()
|
||||||
|
self.conn = taos.connect(conn._host, port=conn._port, config=conn._config)
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
self.event.wait()
|
||||||
|
cur.execute("drop database db")
|
||||||
|
cur.close()
|
||||||
|
self.conn.close()
|
||||||
|
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def init(self, conn, logSql):
|
||||||
|
tdLog.debug("start to execute %s" % __file__)
|
||||||
|
tdSql.init(conn.cursor())
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
for i in range(50):
|
||||||
|
print("round", i)
|
||||||
|
thread = myThread(tdSql.cursor._connection)
|
||||||
|
thread.start()
|
||||||
|
|
||||||
|
tdSql.execute('reset query cache')
|
||||||
|
tdSql.execute('drop database if exists db')
|
||||||
|
tdSql.execute('create database db')
|
||||||
|
tdSql.execute('use db')
|
||||||
|
tdSql.execute("create table car (ts timestamp, s int)")
|
||||||
|
tdSql.execute("insert into car values('2020-10-19 17:00:00', 123)")
|
||||||
|
|
||||||
|
thread.event.set()
|
||||||
|
try:
|
||||||
|
tdSql.query("select s from car where ts = '2020-10-19 17:00:00'")
|
||||||
|
except Exception as e:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
tdSql.checkData(0, 0, 123)
|
||||||
|
|
||||||
|
thread.join()
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success("%s successfully executed" % __file__)
|
||||||
|
|
||||||
|
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
Loading…
Reference in New Issue