[td-225]fix bug found by regression test.
This commit is contained in:
parent
51dc9fa425
commit
25d2821d78
|
@ -4145,8 +4145,8 @@ static SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfOutput, in
|
||||||
return pFillCol;
|
return pFillCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner,
|
int32_t doInitQInfo(SQInfo* pQInfo, STSBuf* pTsBuf, void* tsdb, void* sourceOptr, int32_t tbScanner, SArray* pOperator,
|
||||||
SArray* pOperator, void* param) {
|
void* param) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
|
||||||
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
SQueryAttr *pQueryAttr = pQInfo->runtimeEnv.pQueryAttr;
|
||||||
|
|
|
@ -343,6 +343,15 @@ static SArray* createCheckInfoFromCheckInfo(STableCheckInfo* pCheckInfo, TSKEY s
|
||||||
return pNew;
|
return pNew;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool emptyQueryTimewindow(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
assert(pQueryHandle != NULL);
|
||||||
|
|
||||||
|
STimeWindow* w = &pQueryHandle->window;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
|
||||||
|
|
||||||
|
return ((asc && w->skey > w->ekey) || (!asc && w->ekey > w->skey));
|
||||||
|
}
|
||||||
|
|
||||||
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
// Update the query time window according to the data time to live(TTL) information, in order to avoid to return
|
||||||
// the expired data to client, even it is queried already.
|
// the expired data to client, even it is queried already.
|
||||||
static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
|
static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
|
||||||
|
@ -355,23 +364,27 @@ static int64_t getEarliestValidTimestamp(STsdbRepo* pTsdb) {
|
||||||
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
|
static void setQueryTimewindow(STsdbQueryHandle* pQueryHandle, STsdbQueryCond* pCond) {
|
||||||
pQueryHandle->window = pCond->twindow;
|
pQueryHandle->window = pCond->twindow;
|
||||||
|
|
||||||
|
bool updateTs = false;
|
||||||
int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb);
|
int64_t startTs = getEarliestValidTimestamp(pQueryHandle->pTsdb);
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
||||||
if (startTs > pQueryHandle->window.skey) {
|
if (startTs > pQueryHandle->window.skey) {
|
||||||
pQueryHandle->window.skey = startTs;
|
pQueryHandle->window.skey = startTs;
|
||||||
|
pCond->twindow.skey = startTs;
|
||||||
|
updateTs = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQueryHandle->window.skey <= pQueryHandle->window.ekey);
|
|
||||||
} else {
|
} else {
|
||||||
if (startTs > pQueryHandle->window.ekey) {
|
if (startTs > pQueryHandle->window.ekey) {
|
||||||
pQueryHandle->window.ekey = startTs;
|
pQueryHandle->window.ekey = startTs;
|
||||||
|
pCond->twindow.ekey = startTs;
|
||||||
|
updateTs = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(pQueryHandle->window.skey >= pQueryHandle->window.ekey);
|
if (updateTs) {
|
||||||
|
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64
|
||||||
|
", 0x%" PRIx64, pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey,
|
||||||
|
pQueryHandle->window.ekey, pQueryHandle->qId);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("%p update the query time window, old:%"PRId64" - %"PRId64", new:%"PRId64" - %"PRId64 ", 0x%"PRIx64,
|
|
||||||
pQueryHandle, pCond->twindow.skey, pCond->twindow.ekey, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
|
static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pCond, uint64_t qId, SMemRef* pMemRef) {
|
||||||
|
@ -456,6 +469,9 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
|
||||||
|
|
||||||
TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) {
|
TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STableGroupInfo* groupList, uint64_t qId, SMemRef* pRef) {
|
||||||
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
|
STsdbQueryHandle* pQueryHandle = tsdbQueryTablesImpl(tsdb, pCond, qId, pRef);
|
||||||
|
if (emptyQueryTimewindow(pQueryHandle)) {
|
||||||
|
return (TsdbQueryHandleT*) pQueryHandle;
|
||||||
|
}
|
||||||
|
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
STsdbMeta* pMeta = tsdbGetMeta(tsdb);
|
||||||
assert(pMeta != NULL);
|
assert(pMeta != NULL);
|
||||||
|
@ -479,6 +495,15 @@ TsdbQueryHandleT* tsdbQueryTables(STsdbRepo* tsdb, STsdbQueryCond* pCond, STable
|
||||||
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
|
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond) {
|
||||||
STsdbQueryHandle* pQueryHandle = queryHandle;
|
STsdbQueryHandle* pQueryHandle = queryHandle;
|
||||||
|
|
||||||
|
if (emptyQueryTimewindow(pQueryHandle)) {
|
||||||
|
if (pCond->order != pQueryHandle->order) {
|
||||||
|
pQueryHandle->order = pCond->order;
|
||||||
|
SWAP(pQueryHandle->window.skey, pQueryHandle->window.ekey, int64_t);
|
||||||
|
}
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pQueryHandle->order = pCond->order;
|
pQueryHandle->order = pCond->order;
|
||||||
pQueryHandle->window = pCond->twindow;
|
pQueryHandle->window = pCond->twindow;
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||||
|
@ -1204,8 +1229,9 @@ static int32_t handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SBlock* p
|
||||||
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
|
static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo, bool* exists) {
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pQueryHandle->order);
|
||||||
|
|
||||||
if (ASCENDING_TRAVERSE(pQueryHandle->order)) {
|
if (asc) {
|
||||||
// query ended in/started from current block
|
// query ended in/started from current block
|
||||||
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
|
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||||
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
|
if ((code = doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo, cur->slot)) != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -1226,7 +1252,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
|
||||||
assert(pCheckInfo->lastKey <= pBlock->keyLast);
|
assert(pCheckInfo->lastKey <= pBlock->keyLast);
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else { // the whole block is loaded in to buffer
|
} else { // the whole block is loaded in to buffer
|
||||||
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows - 1);
|
cur->pos = asc? 0:(pBlock->numOfRows - 1);
|
||||||
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
} else { //desc order, query ended in current block
|
} else { //desc order, query ended in current block
|
||||||
|
@ -1246,7 +1272,7 @@ static int32_t loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SBlock* pBlock,
|
||||||
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
|
assert(pCheckInfo->lastKey >= pBlock->keyFirst);
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else {
|
} else {
|
||||||
cur->pos = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:(pBlock->numOfRows-1);
|
cur->pos = asc? 0:(pBlock->numOfRows-1);
|
||||||
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
code = handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2717,6 +2743,11 @@ static bool loadDataBlockFromTableSeq(STsdbQueryHandle* pQueryHandle) {
|
||||||
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
|
bool tsdbNextDataBlock(TsdbQueryHandleT pHandle) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
|
||||||
|
|
||||||
|
if (emptyQueryTimewindow(pQueryHandle)) {
|
||||||
|
tsdbDebug("%p query window not overlaps with the data set, no result returned, 0x%"PRIx64, pQueryHandle, pQueryHandle->qId);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t stime = taosGetTimestampUs();
|
int64_t stime = taosGetTimestampUs();
|
||||||
int64_t elapsedTime = stime;
|
int64_t elapsedTime = stime;
|
||||||
|
|
||||||
|
@ -3669,15 +3700,21 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
|
|
||||||
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
|
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
|
||||||
|
|
||||||
taosArrayDestroy(pQueryHandle->defaultLoadColumn);
|
taosArrayDestroy(pQueryHandle->defaultLoadColumn);
|
||||||
tfree(pQueryHandle->pDataBlockInfo);
|
tfree(pQueryHandle->pDataBlockInfo);
|
||||||
tfree(pQueryHandle->statis);
|
tfree(pQueryHandle->statis);
|
||||||
|
|
||||||
// todo check error
|
if (!emptyQueryTimewindow(pQueryHandle)) {
|
||||||
tsdbMayUnTakeMemSnapshot(pQueryHandle);
|
tsdbMayUnTakeMemSnapshot(pQueryHandle);
|
||||||
|
} else {
|
||||||
|
assert(pQueryHandle->pTableCheckInfo == NULL);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pQueryHandle->pTableCheckInfo != NULL) {
|
||||||
|
pQueryHandle->pTableCheckInfo = destroyTableCheckInfo(pQueryHandle->pTableCheckInfo);
|
||||||
|
}
|
||||||
|
|
||||||
tsdbDestroyReadH(&pQueryHandle->rhelper);
|
tsdbDestroyReadH(&pQueryHandle->rhelper);
|
||||||
|
|
||||||
|
|
|
@ -193,3 +193,7 @@ endi
|
||||||
if $data04 != 1 then
|
if $data04 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
print ===============>safty check TD-4927
|
||||||
|
sql select first(ts, c1) from sr_stb where ts<1 group by t1;
|
||||||
|
sql select first(ts, c1) from sr_stb where ts>0 and ts<1;
|
Loading…
Reference in New Issue