[td-1446]
This commit is contained in:
parent
9d1619c3b9
commit
11cc33c575
|
@ -286,11 +286,11 @@ void taos_close(TAOS *taos) {
|
|||
assert(ref >= 0);
|
||||
|
||||
if (ref > 0) {
|
||||
tscDebug("%p %d remain sqlObjs, do not close dnodeConn:%p", pObj, ref, pObj->pDnodeConn);
|
||||
tscDebug("%p %d remain sqlObjs, not free tscObj and dnodeConn:%p", pObj, ref, pObj->pDnodeConn);
|
||||
return;
|
||||
}
|
||||
|
||||
tscDebug("%p all sqlObj are freed, free tscObj, dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||
tscDebug("%p all sqlObj are freed, free tscObj and close dnodeConn:%p", pObj, pObj->pDnodeConn);
|
||||
tscCloseTscObj(pObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -389,9 +389,9 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
|||
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p",
|
||||
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pMem->keyFirst, pMem->keyLast,
|
||||
pCheckInfo->lastKey, pHandle->qinfo);
|
||||
pCheckInfo->lastKey, pMem->numOfRows, pHandle->qinfo);
|
||||
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
assert(pCheckInfo->lastKey <= key);
|
||||
|
@ -411,9 +411,9 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
|||
SDataRow row = *(SDataRow *)SL_GET_NODE_DATA(node);
|
||||
TSKEY key = dataRowKey(row); // first timestamp in buffer
|
||||
tsdbDebug("%p uid:%" PRId64 ", tid:%d check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", %p",
|
||||
"-%" PRId64 ", lastKey:%" PRId64 ", numOfRows:%"PRId64", %p",
|
||||
pHandle, pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, key, order, pIMem->keyFirst, pIMem->keyLast,
|
||||
pCheckInfo->lastKey, pHandle->qinfo);
|
||||
pCheckInfo->lastKey, pIMem->numOfRows, pHandle->qinfo);
|
||||
|
||||
if (ASCENDING_TRAVERSE(order)) {
|
||||
assert(pCheckInfo->lastKey <= key);
|
||||
|
@ -735,6 +735,7 @@ static int32_t doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* p
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo);
|
||||
static int32_t doCopyRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, int32_t start, int32_t end);
|
||||
static void moveDataToFront(STsdbQueryHandle* pQueryHandle, int32_t numOfRows, int32_t numOfCols);
|
||||
static void doCheckGeneratedBlockRange(STsdbQueryHandle* pQueryHandle);
|
||||
|
@ -791,9 +792,10 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
|||
* Here the buffer is not enough, so only part of file block can be loaded into memory buffer
|
||||
*/
|
||||
assert(pQueryHandle->outputCapacity >= binfo.rows);
|
||||
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &binfo);
|
||||
|
||||
if ((cur->pos == 0 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(cur->pos == (binfo.rows - 1) && (!ASCENDING_TRAVERSE(pQueryHandle->order)))) {
|
||||
if ((cur->pos == 0 && endPos == binfo.rows -1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
(cur->pos == (binfo.rows - 1) && endPos == 0 && (!ASCENDING_TRAVERSE(pQueryHandle->order)))) {
|
||||
pQueryHandle->realNumOfRows = binfo.rows;
|
||||
|
||||
cur->rows = binfo.rows;
|
||||
|
@ -809,7 +811,6 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBloc
|
|||
cur->pos = -1;
|
||||
}
|
||||
} else { // partially copy to dest buffer
|
||||
int32_t endPos = ASCENDING_TRAVERSE(pQueryHandle->order)? (binfo.rows - 1): 0;
|
||||
copyAllRemainRowsFromFileBlock(pQueryHandle, pCheckInfo, &binfo, endPos);
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
|
@ -1204,6 +1205,29 @@ static void copyAllRemainRowsFromFileBlock(STsdbQueryHandle* pQueryHandle, STabl
|
|||
cur->win.ekey, cur->rows, pQueryHandle->qinfo);
|
||||
}
|
||||
|
||||
int32_t getEndPosInDataBlock(STsdbQueryHandle* pQueryHandle, SDataBlockInfo* pBlockInfo) {
|
||||
// NOTE: reverse the order to find the end position in data block
|
||||
int32_t endPos = -1;
|
||||
int32_t order = ASCENDING_TRAVERSE(pQueryHandle->order)? TSDB_ORDER_DESC : TSDB_ORDER_ASC;
|
||||
|
||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||
SDataCols* pCols = pQueryHandle->rhelper.pDataCols[0];
|
||||
|
||||
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey >= pBlockInfo->window.ekey) {
|
||||
endPos = pBlockInfo->rows - 1;
|
||||
cur->mixBlock = (cur->pos != 0);
|
||||
} else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey <= pBlockInfo->window.skey) {
|
||||
endPos = 0;
|
||||
cur->mixBlock = (cur->pos != pBlockInfo->rows - 1);
|
||||
} else {
|
||||
assert(pCols->numOfRows > 0);
|
||||
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
|
||||
return endPos;
|
||||
}
|
||||
|
||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||
// be included in the query time window will be discarded
|
||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||
|
@ -1225,19 +1249,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
|
||||
|
||||
STable* pTable = pCheckInfo->pTableObj;
|
||||
int32_t endPos = cur->pos;
|
||||
|
||||
if (ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
|
||||
endPos = blockInfo.rows - 1;
|
||||
cur->mixBlock = (cur->pos != 0);
|
||||
} else if (!ASCENDING_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
|
||||
endPos = 0;
|
||||
cur->mixBlock = (cur->pos != blockInfo.rows - 1);
|
||||
} else {
|
||||
assert(pCols->numOfRows > 0);
|
||||
endPos = doBinarySearchKey(pCols->cols[0].pData, pCols->numOfRows, pQueryHandle->window.ekey, order);
|
||||
cur->mixBlock = true;
|
||||
}
|
||||
int32_t endPos = getEndPosInDataBlock(pQueryHandle, &blockInfo);
|
||||
|
||||
tsdbDebug("%p uid:%" PRIu64",tid:%d start merge data block, file block range:%"PRIu64"-%"PRIu64" rows:%d, start:%d,"
|
||||
"end:%d, %p",
|
||||
|
@ -1339,8 +1351,8 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
|||
}
|
||||
|
||||
cur->blockCompleted =
|
||||
(((pos >= endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
((pos <= endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
||||
(((pos > endPos || cur->lastKey > pQueryHandle->window.ekey) && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||
((pos < endPos || cur->lastKey < pQueryHandle->window.ekey) && !ASCENDING_TRAVERSE(pQueryHandle->order)));
|
||||
|
||||
if (!ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||
SWAP(cur->win.skey, cur->win.ekey, TSKEY);
|
||||
|
|
|
@ -105,7 +105,6 @@ run general/parser/timestamp.sim
|
|||
sleep 2000
|
||||
run general/parser/sliding.sim
|
||||
|
||||
|
||||
#sleep 2000
|
||||
#run general/parser/repeatStream.sim
|
||||
#sleep 2000
|
||||
|
|
|
@ -5,7 +5,7 @@ system sh/cfg.sh -n dnode1 -c walLevel -v 0
|
|||
system sh/cfg.sh -n dnode1 -c maxtablespervnode -v 200
|
||||
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sleep 3000
|
||||
sleep 1000
|
||||
sql connect
|
||||
|
||||
$dbPrefix = tb_db
|
||||
|
@ -25,7 +25,7 @@ $stb = $stbPrefix . $i
|
|||
|
||||
sql drop database $db -x step1
|
||||
step1:
|
||||
sql create database $db cache 16
|
||||
sql create database $db cache 16 maxrows 4096 keep 36500
|
||||
print ====== create tables
|
||||
sql use $db
|
||||
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int)
|
||||
|
@ -132,15 +132,15 @@ sleep 5000
|
|||
system sh/exec.sh -n dnode1 -s start
|
||||
print ================== server restart completed
|
||||
sql connect
|
||||
sleep 3000
|
||||
sleep 1000
|
||||
|
||||
sql select count(*) from t1.test where ts>10000 and ts<90000 interval(5000a)
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =========>td-1308
|
||||
sql create database db;
|
||||
print ==============>td-1308
|
||||
sql create database db keep 36500;
|
||||
sql use db;
|
||||
|
||||
sql create table stb (ts timestamp, c1 int, c2 binary(10)) tags(t1 binary(10));
|
||||
|
@ -158,4 +158,45 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
print =======================>td-1446
|
||||
sql create table t(ts timestamp, k int)
|
||||
$ts = 6000
|
||||
while $ts < 7000
|
||||
sql insert into t values ( $ts , $ts )
|
||||
$ts = $ts + 1
|
||||
endw
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
sleep 1000
|
||||
sql use db;
|
||||
|
||||
$ts = 1000
|
||||
while $ts < 5096
|
||||
sql insert into t values ( $ts , $ts )
|
||||
$ts = $ts + 1
|
||||
endw
|
||||
|
||||
sql select * from t where ts < 6500
|
||||
if $rows != 4596 then
|
||||
print expect 4596, actual: $rows
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from t where ts < 7000
|
||||
if $rows != 5096 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from t where ts <= 6000
|
||||
if $rows != 4097 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select * from t where ts <= 6001
|
||||
if $rows != 4098 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue