fix some possible query errors
This commit is contained in:
parent
335d42cf49
commit
f192397c0f
|
@ -382,9 +382,9 @@ int vnodeCreateMeterObj(SMeterObj *pNew, SConnSec *pSec);
|
|||
|
||||
int vnodeRemoveMeterObj(int vnode, int sid);
|
||||
|
||||
int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints);
|
||||
int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
|
||||
|
||||
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints);
|
||||
int vnodeImportPoints(SMeterObj *pObj, char *cont, int contLen, char source, void *, int sversion, int *numOfPoints, TSKEY now);
|
||||
|
||||
int vnodeInsertBufferedPoints(int vnode);
|
||||
|
||||
|
@ -537,7 +537,7 @@ void vnodeRemoveCommitLog(int vnode);
|
|||
|
||||
int vnodeWriteToCommitLog(SMeterObj *pObj, char action, char *cont, int contLen, int sversion);
|
||||
|
||||
extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *);
|
||||
extern int (*vnodeProcessAction[])(SMeterObj *, char *, int, char, void *, int, int *, TSKEY);
|
||||
|
||||
extern int (*pCompFunc[])(const char *const input, int inputSize, const int elements, char *const output,
|
||||
int outputSize, char algorithm, char *const buffer, int bufferSize);
|
||||
|
|
|
@ -366,6 +366,8 @@ int mgmtAlterDb(SAcctObj *pAcct, SAlterDbMsg *pAlter) {
|
|||
if (pAlter->daysToKeep > 0) {
|
||||
mTrace("db:%s daysToKeep:%d change to %d", pDb->name, pDb->cfg.daysToKeep, pAlter->daysToKeep);
|
||||
pDb->cfg.daysToKeep = pAlter->daysToKeep;
|
||||
} else {
|
||||
return TSDB_CODE_INVALID_OPTION;
|
||||
}
|
||||
|
||||
if (sdbUpdateRow(dbSdb, pDb, tsDbUpdateSize, 1) < 0) {
|
||||
|
|
|
@ -1052,7 +1052,8 @@ static void mgmtRetrieveMetersFromIDs(tQueryResultset *pRes, char *queryStr, cha
|
|||
}
|
||||
|
||||
/* queried meter not belongs to this metric, ignore */
|
||||
if (mgmtGetMeter(pMeterObj->pTagData)->uid != pMetric->uid) {
|
||||
if (mgmtGetMeter(pMeterObj->pTagData)->uid != pMetric->uid ||
|
||||
strncmp(pMetric->meterId, pMeterObj->pTagData, TSDB_METER_ID_LEN) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
|
|
@ -642,14 +642,19 @@ static int32_t loadDataBlockIntoMem(SCompBlock *pBlock, SField **pField, SQueryR
|
|||
|
||||
int32_t ret = 0;
|
||||
|
||||
/* the first round always be 1, the secondary round is determined by queried
|
||||
* function */
|
||||
// the first round always be 1, the secondary round is determined by queried function
|
||||
int32_t round = pRuntimeEnv->scanFlag;
|
||||
|
||||
while (j < pBlock->numOfCols && i < pQuery->numOfCols) {
|
||||
if ((*pField)[j].colId < pQuery->colList[i].data.colId) {
|
||||
++j;
|
||||
} else if ((*pField)[j].colId == pQuery->colList[i].data.colId) {
|
||||
// add additional check for data type
|
||||
if ((*pField)[j].type != pQuery->colList[i].data.type) {
|
||||
ret = TSDB_CODE_INVALID_QUERY_MSG;
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* during supplementary scan:
|
||||
* 1. primary ts column (always loaded)
|
||||
|
@ -1919,13 +1924,12 @@ static bool cacheBoundaryCheck(SQuery *pQuery, SMeterObj *pMeterObj) {
|
|||
TSKEY min, max;
|
||||
getQueryRange(pQuery, &min, &max);
|
||||
|
||||
// the query time range is earlier than the first element in cache. abort
|
||||
if (max < keyFirst) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (min > keyLast) {
|
||||
/*
|
||||
* The query time range is earlier than the first element or later than the last elements in cache.
|
||||
* If the query window happens to overlap with the time range of disk files but not data in cache,
|
||||
* the flag needs to be cleared. Otherwise, this flag will cause error in following processing.
|
||||
*/
|
||||
if (max < keyFirst || min > keyLast) {
|
||||
setQueryStatus(pQuery, QUERY_NO_DATA_TO_CHECK);
|
||||
return false;
|
||||
}
|
||||
|
@ -2072,6 +2076,8 @@ void vnodeCheckIfDataExists(SQueryRuntimeEnv *pRuntimeEnv, SMeterObj *pMeterObj,
|
|||
|
||||
*dataInCache = hasDataInCache(pRuntimeEnv, pMeterObj);
|
||||
*dataInDisk = hasDataInDisk(pQuery, pMeterObj);
|
||||
|
||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||
}
|
||||
|
||||
static void doGetAlignedIntervalQueryRangeImpl(SQuery *pQuery, int64_t qualifiedKey, int64_t keyFirst, int64_t keyLast,
|
||||
|
@ -2685,23 +2691,75 @@ static void vnodeOpenAllFiles(SQInfo *pQInfo, int32_t vnodeId) {
|
|||
qsort(pRuntimeEnv->pHeaderFiles, (size_t)pRuntimeEnv->numOfFiles, sizeof(SQueryFileInfo), file_order_comparator);
|
||||
}
|
||||
|
||||
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock) {
|
||||
static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo* pBlockInfo, void *pBlock) {
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
int32_t newPos = pQuery->pos;
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
pQuery->pos += pQuery->limit.offset;
|
||||
if (newPos + pQuery->limit.offset > pBlockInfo->size) {
|
||||
newPos = pBlockInfo->size - 1;
|
||||
} else {
|
||||
pQuery->pos -= pQuery->limit.offset;
|
||||
newPos += pQuery->limit.offset;
|
||||
}
|
||||
} else {
|
||||
if (newPos < pQuery->limit.offset) {
|
||||
newPos = 0;
|
||||
} else {
|
||||
newPos -= pQuery->limit.offset;
|
||||
}
|
||||
}
|
||||
|
||||
TSKEY newKey = 0;
|
||||
if (IS_DISK_DATA_BLOCK(pQuery)) {
|
||||
pQuery->skey = getTimestampInDiskBlock(pRuntimeEnv, pQuery->pos);
|
||||
newKey = getTimestampInDiskBlock(pRuntimeEnv, newPos);
|
||||
} else {
|
||||
pQuery->skey = getTimestampInCacheBlock(pBlock, pQuery->pos);
|
||||
newKey = getTimestampInCacheBlock(pBlock, newPos);
|
||||
}
|
||||
|
||||
/*
|
||||
* The actually qualified points that can be skipped needs to be calculated if query is
|
||||
* done in current data block
|
||||
*/
|
||||
if ((newKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(newKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||
|
||||
// update the pQuery->limit.offset value, and pQuery->pos value
|
||||
TSKEY* keys = NULL;
|
||||
if (IS_DISK_DATA_BLOCK(pQuery)) {
|
||||
keys = (TSKEY *) pRuntimeEnv->primaryColBuffer->data;
|
||||
} else {
|
||||
keys = (TSKEY *) (((SCacheBlock *)pBlock)->offset[0]);
|
||||
}
|
||||
|
||||
int32_t i = 0;
|
||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
for(i = pQuery->pos; i < pBlockInfo->size && pQuery->limit.offset > 0; ++i) {
|
||||
if (keys[i] <= pQuery->ekey) {
|
||||
pQuery->limit.offset -= 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
for(i = pQuery->pos; i >= 0 && pQuery->limit.offset > 0; --i) {
|
||||
if (keys[i] >= pQuery->ekey) {
|
||||
pQuery->limit.offset -= 1;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pQuery->pos = i;
|
||||
} else {
|
||||
pQuery->skey = newKey;
|
||||
pQuery->lastKey = pQuery->skey;
|
||||
pQuery->limit.offset = 0;
|
||||
|
||||
// update the offset value
|
||||
pQuery->limit.offset -= abs(newPos - pQuery->pos);
|
||||
pQuery->pos = newPos;
|
||||
}
|
||||
}
|
||||
|
||||
// todo ignore the avg/sum/min/max/count/stddev/top/bottom functions, of which
|
||||
|
@ -2816,8 +2874,9 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
|
||||
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
|
||||
|
||||
if (pQuery->limit.offset < maxReads) { // start position in current block
|
||||
updateOffsetVal(pRuntimeEnv, pBlock);
|
||||
if (pQuery->limit.offset < maxReads || (pQuery->ekey <= blockInfo.keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(pQuery->ekey >= blockInfo.keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { // start position in current block
|
||||
updateOffsetVal(pRuntimeEnv, &blockInfo, pBlock);
|
||||
break;
|
||||
} else {
|
||||
pQuery->limit.offset -= maxReads;
|
||||
|
@ -2843,8 +2902,9 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
SBlockInfo blockInfo = getBlockBasicInfo(pBlock, blockType);
|
||||
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
|
||||
|
||||
if (pQuery->limit.offset < maxReads) { // start position in current block
|
||||
updateOffsetVal(pRuntimeEnv, pBlock);
|
||||
if (pQuery->limit.offset < maxReads || (pQuery->ekey <= blockInfo.keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||
(pQuery->ekey >= blockInfo.keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { // start position in current block
|
||||
updateOffsetVal(pRuntimeEnv, &blockInfo, pBlock);
|
||||
} else {
|
||||
pQuery->limit.offset -= maxReads;
|
||||
doSkipDataBlock(pRuntimeEnv);
|
||||
|
|
|
@ -404,7 +404,8 @@ static bool multimeterMultioutputHelper(SQInfo *pQInfo, bool *dataInDisk, bool *
|
|||
|
||||
vnodeCheckIfDataExists(pRuntimeEnv, pMeterObj, dataInDisk, dataInCache);
|
||||
|
||||
if (pQuery->lastKey > pMeterObj->lastKey && QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
// data in file or cache is not qualified for the query. abort
|
||||
if (!(dataInCache || dataInDisk)) {
|
||||
dTrace("QInfo:%p vid:%d sid:%d meterId:%s, qrange:%lld-%lld, nores, %p", pQInfo, pMeterObj->vnode, pMeterObj->sid,
|
||||
pMeterObj->meterId, pQuery->skey, pQuery->ekey, pQuery);
|
||||
return false;
|
||||
|
@ -578,7 +579,7 @@ static void vnodeMultiMeterMultiOutputProcessor(SQInfo *pQInfo) {
|
|||
if (pQuery->numOfFilterCols == 0 && pQuery->limit.offset > 0) {
|
||||
forwardQueryStartPosition(pRuntimeEnv);
|
||||
|
||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
|
||||
pQuery->skey = pSupporter->rawSKey;
|
||||
pQuery->ekey = pSupporter->rawEKey;
|
||||
continue;
|
||||
|
|
Loading…
Reference in New Issue