[td-98] fix time range query bug when data kept in buffer
This commit is contained in:
parent
f5a1ac4b48
commit
2589fc9efb
|
@ -160,7 +160,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
SQueryCostSummary summary;
|
SQueryCostSummary summary;
|
||||||
bool stableQuery; // super table query or not
|
bool stableQuery; // super table query or not
|
||||||
void* pQueryHandle;
|
void* pQueryHandle;
|
||||||
void* pSubQueryHandle; // another thread for
|
void* pSecQueryHandle; // another thread for
|
||||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
} SQueryRuntimeEnv;
|
} SQueryRuntimeEnv;
|
||||||
|
|
||||||
|
|
|
@ -1552,6 +1552,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||||
|
|
||||||
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2565,7 +2567,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
||||||
|
|
||||||
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSubQueryHandle;
|
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
|
|
||||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
|
@ -3557,7 +3559,10 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
taosArrayPush(cols, &pQuery->colList[i]);
|
taosArrayPush(cols, &pQuery->colList[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRuntimeEnv->pSubQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols);
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||||
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(cols);
|
taosArrayDestroy(cols);
|
||||||
|
|
||||||
status = pQuery->status;
|
status = pQuery->status;
|
||||||
|
|
|
@ -218,9 +218,31 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
if (pTable->mem == NULL && pTable->imem == NULL) {
|
if (pTable->mem == NULL && pTable->imem == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||||
|
pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
|
if (pCheckInfo->iter == NULL) {
|
||||||
|
pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData);
|
||||||
|
if (pCheckInfo->iter == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
|
if (node == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
|
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
|
||||||
|
|
||||||
// all data in mem are checked already.
|
// all data in mem are checked already.
|
||||||
if (pTableCheckInfo->lastKey > pTable->mem->keyLast) {
|
if (pTableCheckInfo->lastKey > pHandle->window.ekey) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -522,13 +544,15 @@ static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
int numOfPoints;
|
int numOfPoints;
|
||||||
TSKEY* keyList;
|
TSKEY* keyList;
|
||||||
|
|
||||||
|
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||||
|
|
||||||
if (num <= 0) return -1;
|
if (num <= 0) return -1;
|
||||||
|
|
||||||
keyList = (TSKEY*)pValue;
|
keyList = (TSKEY*)pValue;
|
||||||
firstPos = 0;
|
firstPos = 0;
|
||||||
lastPos = num - 1;
|
lastPos = num - 1;
|
||||||
|
|
||||||
if (order == 0) {
|
if (order == TSDB_ORDER_DESC) {
|
||||||
// find the first position which is smaller than the key
|
// find the first position which is smaller than the key
|
||||||
while (1) {
|
while (1) {
|
||||||
if (key >= keyList[lastPos]) return lastPos;
|
if (key >= keyList[lastPos]) return lastPos;
|
||||||
|
@ -596,8 +620,8 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
||||||
pQueryHandle->realNumOfRows = cur->pos + 1;
|
pQueryHandle->realNumOfRows = cur->pos + 1;
|
||||||
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
|
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
|
||||||
} else {
|
} else {
|
||||||
endPos =
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||||
vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
|
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
||||||
if (endPos < cur->pos) {
|
if (endPos < cur->pos) {
|
||||||
|
@ -1042,7 +1066,7 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
*skey = INT64_MIN;
|
*skey = INT64_MIN;
|
||||||
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
do/* (1) */{
|
||||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
if (node == NULL) break;
|
if (node == NULL) break;
|
||||||
|
|
||||||
|
@ -1063,8 +1087,11 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
if (numOfRows >= maxRowsToRead) break;
|
if (numOfRows >= maxRowsToRead) {
|
||||||
};
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
} while(tSkipListIterNext(pIter));
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
@ -1107,10 +1134,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
|
||||||
|
|
||||||
if (pTable->mem != NULL) {
|
if (pTable->mem != NULL) {
|
||||||
// create mem table iterator if it is not created yet
|
// create mem table iterator if it is not created yet
|
||||||
if (pCheckInfo->iter == NULL) {
|
assert(pCheckInfo->iter != NULL);
|
||||||
pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData);
|
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle);
|
||||||
}
|
|
||||||
rows = tsdbReadRowsFromCache(pCheckInfo->iter, INT64_MAX, 2, &skey, &ekey, pHandle);
|
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
pCheckInfo->lastKey = ekey + 1;
|
pCheckInfo->lastKey = ekey + 1;
|
||||||
|
|
Loading…
Reference in New Issue