commit
dd7e295269
|
@ -3578,121 +3578,6 @@ void spread_function_finalizer(SQLFunctionCtx *pCtx) {
|
||||||
doFinalizer(pCtx);
|
doFinalizer(pCtx);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* Compare two strings
|
|
||||||
* TSDB_MATCH: Match
|
|
||||||
* TSDB_NOMATCH: No match
|
|
||||||
* TSDB_NOWILDCARDMATCH: No match in spite of having * or % wildcards.
|
|
||||||
* Like matching rules:
|
|
||||||
* '%': Matches zero or more characters
|
|
||||||
* '_': Matches one character
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
int patternMatch(const char *patterStr, const char *str, size_t size, const SPatternCompareInfo *pInfo) {
|
|
||||||
char c, c1;
|
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
int32_t j = 0;
|
|
||||||
|
|
||||||
while ((c = patterStr[i++]) != 0) {
|
|
||||||
if (c == pInfo->matchAll) { /* Match "*" */
|
|
||||||
|
|
||||||
while ((c = patterStr[i++]) == pInfo->matchAll || c == pInfo->matchOne) {
|
|
||||||
if (c == pInfo->matchOne && (j > size || str[j++] == 0)) {
|
|
||||||
// empty string, return not match
|
|
||||||
return TSDB_PATTERN_NOWILDCARDMATCH;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (c == 0) {
|
|
||||||
return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */
|
|
||||||
}
|
|
||||||
|
|
||||||
char next[3] = {toupper(c), tolower(c), 0};
|
|
||||||
while (1) {
|
|
||||||
size_t n = strcspn(str, next);
|
|
||||||
str += n;
|
|
||||||
|
|
||||||
if (str[0] == 0 || (n >= size - 1)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = patternMatch(&patterStr[i], ++str, size - n - 1, pInfo);
|
|
||||||
if (ret != TSDB_PATTERN_NOMATCH) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return TSDB_PATTERN_NOWILDCARDMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
c1 = str[j++];
|
|
||||||
|
|
||||||
if (j <= size) {
|
|
||||||
if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_PATTERN_NOMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) {
|
|
||||||
wchar_t c, c1;
|
|
||||||
wchar_t matchOne = L'_'; // "_"
|
|
||||||
wchar_t matchAll = L'%'; // "%"
|
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
int32_t j = 0;
|
|
||||||
|
|
||||||
while ((c = patterStr[i++]) != 0) {
|
|
||||||
if (c == matchAll) { /* Match "%" */
|
|
||||||
|
|
||||||
while ((c = patterStr[i++]) == matchAll || c == matchOne) {
|
|
||||||
if (c == matchOne && (j > size || str[j++] == 0)) {
|
|
||||||
return TSDB_PATTERN_NOWILDCARDMATCH;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (c == 0) {
|
|
||||||
return TSDB_PATTERN_MATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
wchar_t accept[3] = {towupper(c), towlower(c), 0};
|
|
||||||
while (1) {
|
|
||||||
size_t n = wcsspn(str, accept);
|
|
||||||
|
|
||||||
str += n;
|
|
||||||
if (str[0] == 0 || (n >= size - 1)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
str++;
|
|
||||||
|
|
||||||
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo);
|
|
||||||
if (ret != TSDB_PATTERN_NOMATCH) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_PATTERN_NOWILDCARDMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
c1 = str[j++];
|
|
||||||
|
|
||||||
if (j <= size) {
|
|
||||||
if (c == c1 || towlower(c) == towlower(c1) || (c == matchOne && c1 != 0)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_PATTERN_NOMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
|
static void getStatics_i8(int64_t *primaryKey, int32_t type, int8_t *data, int32_t numOfRow, int64_t *min, int64_t *max,
|
||||||
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
|
int64_t *sum, int16_t *minIndex, int16_t *maxIndex, int32_t *numOfNull) {
|
||||||
*min = INT64_MAX;
|
*min = INT64_MAX;
|
||||||
|
|
|
@ -30,6 +30,7 @@
|
||||||
#include "ttokendef.h"
|
#include "ttokendef.h"
|
||||||
|
|
||||||
#include "name.h"
|
#include "name.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
|
#define DEFAULT_PRIMARY_TIMESTAMP_COL_NAME "_c0"
|
||||||
|
|
||||||
|
|
|
@ -15,19 +15,13 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tscompression.h"
|
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "qast.h"
|
|
||||||
#include "qextbuffer.h"
|
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tscompression.h"
|
#include "tscompression.h"
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "tsqlfunction.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
#include "name.h"
|
#include "name.h"
|
||||||
#include "taccount.h"
|
#include "taccount.h"
|
||||||
#include "mgmtDClient.h"
|
#include "mgmtDClient.h"
|
||||||
|
@ -42,6 +36,7 @@
|
||||||
#include "mgmtTable.h"
|
#include "mgmtTable.h"
|
||||||
#include "mgmtUser.h"
|
#include "mgmtUser.h"
|
||||||
#include "mgmtVgroup.h"
|
#include "mgmtVgroup.h"
|
||||||
|
#include "tcompare.h"
|
||||||
|
|
||||||
void * tsChildTableSdb;
|
void * tsChildTableSdb;
|
||||||
void * tsSuperTableSdb;
|
void * tsSuperTableSdb;
|
||||||
|
|
|
@ -160,7 +160,7 @@ typedef struct SQueryRuntimeEnv {
|
||||||
SQueryCostSummary summary;
|
SQueryCostSummary summary;
|
||||||
bool stableQuery; // super table query or not
|
bool stableQuery; // super table query or not
|
||||||
void* pQueryHandle;
|
void* pQueryHandle;
|
||||||
|
void* pSecQueryHandle; // another thread for
|
||||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||||
} SQueryRuntimeEnv;
|
} SQueryRuntimeEnv;
|
||||||
|
|
||||||
|
@ -172,6 +172,8 @@ typedef struct SQInfo {
|
||||||
int32_t code; // error code to returned to client
|
int32_t code; // error code to returned to client
|
||||||
sem_t dataReady;
|
sem_t dataReady;
|
||||||
SArray* pTableIdList; // table id list
|
SArray* pTableIdList; // table id list
|
||||||
|
void* tsdb;
|
||||||
|
|
||||||
SQueryRuntimeEnv runtimeEnv;
|
SQueryRuntimeEnv runtimeEnv;
|
||||||
int32_t subgroupIdx;
|
int32_t subgroupIdx;
|
||||||
int32_t offset; /* offset in group result set of subgroup */
|
int32_t offset; /* offset in group result set of subgroup */
|
||||||
|
|
|
@ -79,17 +79,10 @@ extern "C" {
|
||||||
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
|
#define TSDB_BASE_FUNC_SO TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
|
||||||
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
|
#define TSDB_BASE_FUNC_MO TSDB_FUNCSTATE_MO | TSDB_FUNCSTATE_STREAM | TSDB_FUNCSTATE_METRIC | TSDB_FUNCSTATE_OF
|
||||||
|
|
||||||
#define TSDB_PATTERN_MATCH 0
|
|
||||||
#define TSDB_PATTERN_NOMATCH 1
|
|
||||||
#define TSDB_PATTERN_NOWILDCARDMATCH 2
|
|
||||||
#define TSDB_PATTERN_STRING_MAX_LEN 20
|
|
||||||
|
|
||||||
#define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16
|
#define TSDB_FUNCTIONS_NAME_MAX_LENGTH 16
|
||||||
#define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50
|
#define TSDB_AVG_FUNCTION_INTER_BUFFER_SIZE 50
|
||||||
|
|
||||||
#define PATTERN_COMPARE_INFO_INITIALIZER \
|
|
||||||
{ '%', '_' }
|
|
||||||
|
|
||||||
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
|
||||||
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
|
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
|
||||||
|
|
||||||
|
@ -222,20 +215,11 @@ typedef struct SQLAggFuncElem {
|
||||||
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
|
int32_t (*dataReqFunc)(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId);
|
||||||
} SQLAggFuncElem;
|
} SQLAggFuncElem;
|
||||||
|
|
||||||
typedef struct SPatternCompareInfo {
|
|
||||||
char matchAll; // symbol for match all wildcard, default: '%'
|
|
||||||
char matchOne; // symbol for match one wildcard, default: '_'
|
|
||||||
} SPatternCompareInfo;
|
|
||||||
|
|
||||||
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
#define GET_RES_INFO(ctx) ((ctx)->resultInfo)
|
||||||
|
|
||||||
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
int32_t getResultDataInfo(int32_t dataType, int32_t dataBytes, int32_t functionId, int32_t param, int16_t *type,
|
||||||
int16_t *len, int16_t *interResBytes, int16_t extLength, bool isSuperTable);
|
int16_t *len, int16_t *interResBytes, int16_t extLength, bool isSuperTable);
|
||||||
|
|
||||||
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
|
|
||||||
|
|
||||||
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
|
|
||||||
|
|
||||||
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
|
#define IS_STREAM_QUERY_VALID(x) (((x)&TSDB_FUNCSTATE_STREAM) != 0)
|
||||||
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
|
#define IS_MULTIOUTPUT(x) (((x)&TSDB_FUNCSTATE_MO) != 0)
|
||||||
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
|
#define IS_SINGLEOUTPUT(x) (((x)&TSDB_FUNCSTATE_SO) != 0)
|
||||||
|
|
|
@ -362,10 +362,10 @@ bool isTSCompQuery(SQuery *pQuery) { return pQuery->pSelectExpr[0].pBase.functio
|
||||||
bool doRevisedResultsByLimit(SQInfo *pQInfo) {
|
bool doRevisedResultsByLimit(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
if ((pQuery->limit.limit > 0) && (pQuery->rec.rows + pQuery->rec.rows > pQuery->limit.limit)) {
|
if ((pQuery->limit.limit > 0) && (pQuery->rec.total + pQuery->rec.rows > pQuery->limit.limit)) {
|
||||||
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.rows;
|
pQuery->rec.rows = pQuery->limit.limit - pQuery->rec.total;
|
||||||
|
assert(pQuery->rec.rows > 0);
|
||||||
// query completed
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1552,6 +1552,8 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
destroyResultBuf(pRuntimeEnv->pResultBuf);
|
||||||
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
|
tsdbCleanupQueryHandle(pRuntimeEnv->pSecQueryHandle);
|
||||||
|
|
||||||
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
pRuntimeEnv->pTSBuf = tsBufDestory(pRuntimeEnv->pTSBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2501,17 +2503,20 @@ SArray *loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBl
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
||||||
int firstPos, lastPos, midPos = -1;
|
int32_t midPos = -1;
|
||||||
int numOfPoints;
|
int32_t numOfPoints;
|
||||||
TSKEY *keyList;
|
|
||||||
|
|
||||||
if (num <= 0) return -1;
|
if (num <= 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
keyList = (TSKEY *)pValue;
|
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||||
firstPos = 0;
|
|
||||||
lastPos = num - 1;
|
TSKEY* keyList = (TSKEY *)pValue;
|
||||||
|
int32_t firstPos = 0;
|
||||||
|
int32_t lastPos = num - 1;
|
||||||
|
|
||||||
if (order == 0) {
|
if (order == TSDB_ORDER_DESC) {
|
||||||
// find the first position which is smaller than the key
|
// find the first position which is smaller than the key
|
||||||
while (1) {
|
while (1) {
|
||||||
if (key >= keyList[lastPos]) return lastPos;
|
if (key >= keyList[lastPos]) return lastPos;
|
||||||
|
@ -2565,7 +2570,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
dTrace("QInfo:%p query start, qrange:%" PRId64 "-%" PRId64 ", lastkey:%" PRId64 ", order:%d",
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
GET_QINFO_ADDR(pRuntimeEnv), pQuery->window.skey, pQuery->window.ekey, pQuery->lastKey, pQuery->order.order);
|
||||||
|
|
||||||
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->pQueryHandle;
|
tsdb_query_handle_t pQueryHandle = pRuntimeEnv->scanFlag == MASTER_SCAN? pRuntimeEnv->pQueryHandle:pRuntimeEnv->pSecQueryHandle;
|
||||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||||
|
|
||||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
|
@ -3520,8 +3525,9 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
|
|
||||||
// store the start query position
|
// store the start query position
|
||||||
void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
|
// void *pos = tsdbDataBlockTell(pRuntimeEnv->pQueryHandle);
|
||||||
|
SQInfo* pQInfo = (SQInfo*) GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
|
|
||||||
int64_t skey = pQuery->lastKey;
|
int64_t skey = pQuery->lastKey;
|
||||||
int32_t status = pQuery->status;
|
int32_t status = pQuery->status;
|
||||||
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex;
|
int32_t activeSlot = pRuntimeEnv->windowResInfo.curIndex;
|
||||||
|
@ -3543,15 +3549,32 @@ void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
|
// set the correct start position, and load the corresponding block in buffer for next round scan all data blocks.
|
||||||
/*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
|
// /*int32_t ret =*/ tsdbDataBlockSeek(pRuntimeEnv->pQueryHandle, pos);
|
||||||
|
|
||||||
|
STsdbQueryCond cond = {
|
||||||
|
.twindow = {pQuery->window.skey, pQuery->lastKey},
|
||||||
|
.order = pQuery->order.order,
|
||||||
|
.colList = pQuery->colList,
|
||||||
|
};
|
||||||
|
|
||||||
|
SArray *cols = taosArrayInit(pQuery->numOfCols, sizeof(pQuery->colList[0]));
|
||||||
|
for (int32_t i = 0; i < pQuery->numOfCols; ++i) {
|
||||||
|
taosArrayPush(cols, &pQuery->colList[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRuntimeEnv->pSecQueryHandle != NULL) {
|
||||||
|
pRuntimeEnv->pSecQueryHandle = tsdbQueryByTableId(pQInfo->tsdb, &cond, pQInfo->pTableIdList, cols);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(cols);
|
||||||
|
|
||||||
status = pQuery->status;
|
status = pQuery->status;
|
||||||
pRuntimeEnv->windowResInfo.curIndex = activeSlot;
|
pRuntimeEnv->windowResInfo.curIndex = activeSlot;
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
pRuntimeEnv->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
/* check if query is killed or not */
|
// check if query is killed or not
|
||||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4179,6 +4202,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void* tsdb, bool isSTableQuery)
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
|
pRuntimeEnv->pQueryHandle = tsdbQueryByTableId(tsdb, &cond, pQInfo->pTableIdList, cols);
|
||||||
taosArrayDestroy(cols);
|
taosArrayDestroy(cols);
|
||||||
|
pQInfo->tsdb = tsdb;
|
||||||
|
|
||||||
pRuntimeEnv->pQuery = pQuery;
|
pRuntimeEnv->pQuery = pQuery;
|
||||||
pRuntimeEnv->pTSBuf = param;
|
pRuntimeEnv->pTSBuf = param;
|
||||||
|
@ -4972,7 +4996,6 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
// initCtxOutputBuf(pRuntimeEnv);
|
|
||||||
scanAllDataBlocks(pRuntimeEnv);
|
scanAllDataBlocks(pRuntimeEnv);
|
||||||
|
|
||||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
|
|
|
@ -16,8 +16,24 @@
|
||||||
#ifndef TDENGINE_TCOMPARE_H
|
#ifndef TDENGINE_TCOMPARE_H
|
||||||
#define TDENGINE_TCOMPARE_H
|
#define TDENGINE_TCOMPARE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
|
#define TSDB_PATTERN_MATCH 0
|
||||||
|
#define TSDB_PATTERN_NOMATCH 1
|
||||||
|
#define TSDB_PATTERN_NOWILDCARDMATCH 2
|
||||||
|
#define TSDB_PATTERN_STRING_MAX_LEN 20
|
||||||
|
|
||||||
|
#define PATTERN_COMPARE_INFO_INITIALIZER { '%', '_' }
|
||||||
|
|
||||||
|
typedef struct SPatternCompareInfo {
|
||||||
|
char matchAll; // symbol for match all wildcard, default: '%'
|
||||||
|
char matchOne; // symbol for match one wildcard, default: '_'
|
||||||
|
} SPatternCompareInfo;
|
||||||
|
|
||||||
int32_t compareInt32Val(const void *pLeft, const void *pRight);
|
int32_t compareInt32Val(const void *pLeft, const void *pRight);
|
||||||
|
|
||||||
int32_t compareInt64Val(const void *pLeft, const void *pRight);
|
int32_t compareInt64Val(const void *pLeft, const void *pRight);
|
||||||
|
@ -36,8 +52,16 @@ int32_t compareStrVal(const void *pLeft, const void *pRight);
|
||||||
|
|
||||||
int32_t compareWStrVal(const void *pLeft, const void *pRight);
|
int32_t compareWStrVal(const void *pLeft, const void *pRight);
|
||||||
|
|
||||||
|
int patternMatch(const char *zPattern, const char *zString, size_t size, const SPatternCompareInfo *pInfo);
|
||||||
|
|
||||||
|
int WCSPatternMatch(const wchar_t *zPattern, const wchar_t *zString, size_t size, const SPatternCompareInfo *pInfo);
|
||||||
|
|
||||||
__compar_fn_t getKeyComparFunc(int32_t keyType);
|
__compar_fn_t getKeyComparFunc(int32_t keyType);
|
||||||
|
|
||||||
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType);
|
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif // TDENGINE_TCOMPARE_H
|
#endif // TDENGINE_TCOMPARE_H
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
|
|
||||||
|
#include "tutil.h"
|
||||||
|
|
||||||
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
|
int32_t compareInt32Val(const void *pLeft, const void *pRight) {
|
||||||
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
|
int32_t ret = GET_INT32_VAL(pLeft) - GET_INT32_VAL(pRight);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
|
@ -11,7 +13,7 @@ int32_t compareInt32Val(const void *pLeft, const void *pRight) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
|
int32_t compareInt64Val(const void *pLeft, const void *pRight) {
|
||||||
int32_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight);
|
int64_t ret = GET_INT64_VAL(pLeft) - GET_INT64_VAL(pRight);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
|
@ -102,6 +104,143 @@ int32_t compareWStrVal(const void *pLeft, const void *pRight) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Compare two strings
|
||||||
|
* TSDB_MATCH: Match
|
||||||
|
* TSDB_NOMATCH: No match
|
||||||
|
* TSDB_NOWILDCARDMATCH: No match in spite of having * or % wildcards.
|
||||||
|
* Like matching rules:
|
||||||
|
* '%': Matches zero or more characters
|
||||||
|
* '_': Matches one character
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
int patternMatch(const char *patterStr, const char *str, size_t size, const SPatternCompareInfo *pInfo) {
|
||||||
|
char c, c1;
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 0;
|
||||||
|
|
||||||
|
while ((c = patterStr[i++]) != 0) {
|
||||||
|
if (c == pInfo->matchAll) { /* Match "*" */
|
||||||
|
|
||||||
|
while ((c = patterStr[i++]) == pInfo->matchAll || c == pInfo->matchOne) {
|
||||||
|
if (c == pInfo->matchOne && (j > size || str[j++] == 0)) {
|
||||||
|
// empty string, return not match
|
||||||
|
return TSDB_PATTERN_NOWILDCARDMATCH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (c == 0) {
|
||||||
|
return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */
|
||||||
|
}
|
||||||
|
|
||||||
|
char next[3] = {toupper(c), tolower(c), 0};
|
||||||
|
while (1) {
|
||||||
|
size_t n = strcspn(str, next);
|
||||||
|
str += n;
|
||||||
|
|
||||||
|
if (str[0] == 0 || (n >= size - 1)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t ret = patternMatch(&patterStr[i], ++str, size - n - 1, pInfo);
|
||||||
|
if (ret != TSDB_PATTERN_NOMATCH) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_PATTERN_NOWILDCARDMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
c1 = str[j++];
|
||||||
|
|
||||||
|
if (j <= size) {
|
||||||
|
if (c == c1 || tolower(c) == tolower(c1) || (c == pInfo->matchOne && c1 != 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_PATTERN_NOMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
int WCSPatternMatch(const wchar_t *patterStr, const wchar_t *str, size_t size, const SPatternCompareInfo *pInfo) {
|
||||||
|
wchar_t c, c1;
|
||||||
|
wchar_t matchOne = L'_'; // "_"
|
||||||
|
wchar_t matchAll = L'%'; // "%"
|
||||||
|
|
||||||
|
int32_t i = 0;
|
||||||
|
int32_t j = 0;
|
||||||
|
|
||||||
|
while ((c = patterStr[i++]) != 0) {
|
||||||
|
if (c == matchAll) { /* Match "%" */
|
||||||
|
|
||||||
|
while ((c = patterStr[i++]) == matchAll || c == matchOne) {
|
||||||
|
if (c == matchOne && (j > size || str[j++] == 0)) {
|
||||||
|
return TSDB_PATTERN_NOWILDCARDMATCH;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (c == 0) {
|
||||||
|
return TSDB_PATTERN_MATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
wchar_t accept[3] = {towupper(c), towlower(c), 0};
|
||||||
|
while (1) {
|
||||||
|
size_t n = wcsspn(str, accept);
|
||||||
|
|
||||||
|
str += n;
|
||||||
|
if (str[0] == 0 || (n >= size - 1)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
str++;
|
||||||
|
|
||||||
|
int32_t ret = WCSPatternMatch(&patterStr[i], str, wcslen(str), pInfo);
|
||||||
|
if (ret != TSDB_PATTERN_NOMATCH) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_PATTERN_NOWILDCARDMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
c1 = str[j++];
|
||||||
|
|
||||||
|
if (j <= size) {
|
||||||
|
if (c == c1 || towlower(c) == towlower(c1) || (c == matchOne && c1 != 0)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_PATTERN_NOMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (str[j] == 0 || j >= size) ? TSDB_PATTERN_MATCH : TSDB_PATTERN_NOMATCH;
|
||||||
|
}
|
||||||
|
|
||||||
|
static UNUSED_FUNC int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
|
||||||
|
SPatternCompareInfo pInfo = {'%', '_'};
|
||||||
|
|
||||||
|
const char* pattern = pRight;
|
||||||
|
const char* str = pLeft;
|
||||||
|
|
||||||
|
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
|
||||||
|
|
||||||
|
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
|
||||||
|
SPatternCompareInfo pInfo = {'%', '_'};
|
||||||
|
|
||||||
|
const wchar_t* pattern = pRight;
|
||||||
|
const wchar_t* str = pLeft;
|
||||||
|
|
||||||
|
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
|
||||||
|
|
||||||
|
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
||||||
|
}
|
||||||
|
|
||||||
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
|
__compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
|
||||||
__compar_fn_t comparFn = NULL;
|
__compar_fn_t comparFn = NULL;
|
||||||
|
|
||||||
|
@ -109,8 +248,9 @@ __compar_fn_t getComparFunc(int32_t type, int32_t filterDataType) {
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
case TSDB_DATA_TYPE_BIGINT: {
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
if (filterDataType == TSDB_DATA_TYPE_BIGINT) {
|
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||||
|
if (filterDataType == TSDB_DATA_TYPE_BIGINT || filterDataType == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
comparFn = compareInt64Val;
|
comparFn = compareInt64Val;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,53 +75,6 @@ static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **forward, SSk
|
||||||
static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode);
|
static SSkipListNode* tSkipListDoAppend(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||||
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
static SSkipListIterator* doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
||||||
|
|
||||||
//static __compar_fn_t getComparFunc(SSkipList *pSkipList, int32_t filterDataType) {
|
|
||||||
// __compar_fn_t comparFn = NULL;
|
|
||||||
//
|
|
||||||
// switch (pSkipList->keyInfo.type) {
|
|
||||||
// case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
// case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
// case TSDB_DATA_TYPE_INT:
|
|
||||||
// case TSDB_DATA_TYPE_BIGINT: {
|
|
||||||
// if (filterDataType == TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
// comparFn = compareInt64Val;
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// case TSDB_DATA_TYPE_BOOL: {
|
|
||||||
// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
// comparFn = compareInt32Val;
|
|
||||||
// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
// comparFn = compareIntDoubleVal;
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
// case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
//// if (filterDataType >= TSDB_DATA_TYPE_BOOL && filterDataType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
//// comparFn = compareDoubleIntVal;
|
|
||||||
//// } else if (filterDataType >= TSDB_DATA_TYPE_FLOAT && filterDataType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
//// comparFn = compareDoubleVal;
|
|
||||||
//// }
|
|
||||||
// if (filterDataType == TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
// comparFn = compareDoubleVal;
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// case TSDB_DATA_TYPE_BINARY:
|
|
||||||
// comparFn = compareStrVal;
|
|
||||||
// break;
|
|
||||||
// case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
// comparFn = compareWStrVal;
|
|
||||||
// break;
|
|
||||||
// default:
|
|
||||||
// comparFn = compareInt32Val;
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return comparFn;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static bool initForwardBackwardPtr(SSkipList* pSkipList) {
|
static bool initForwardBackwardPtr(SSkipList* pSkipList) {
|
||||||
uint32_t maxLevel = pSkipList->maxLevel;
|
uint32_t maxLevel = pSkipList->maxLevel;
|
||||||
|
|
||||||
|
@ -445,6 +398,11 @@ SSkipListIterator *tSkipListCreateIterFromVal(SSkipList* pSkipList, const char*
|
||||||
iter->cur = forward[0]; // greater equals than the value
|
iter->cur = forward[0]; // greater equals than the value
|
||||||
} else {
|
} else {
|
||||||
iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0);
|
iter->cur = SL_GET_FORWARD_POINTER(forward[0], 0);
|
||||||
|
|
||||||
|
if (ret == 0) {
|
||||||
|
assert(iter->cur != pSkipList->pTail);
|
||||||
|
iter->cur = SL_GET_FORWARD_POINTER(iter->cur, 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return iter;
|
return iter;
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
|
|
||||||
#define EXTRA_BYTES 2
|
#define EXTRA_BYTES 2
|
||||||
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
|
#define PRIMARY_TSCOL_REQUIRED(c) (((SColumnInfoData*)taosArrayGet(c, 0))->info.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX)
|
||||||
#define QUERY_IS_ASC_QUERY(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_ORDER_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
#define QH_GET_NUM_OF_COLS(handle) ((size_t)(taosArrayGetSize((handle)->pColumns)))
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -87,6 +87,13 @@ typedef struct STableBlockInfo {
|
||||||
int32_t groupIdx; /* number of group is less than the total number of tables */
|
int32_t groupIdx; /* number of group is less than the total number of tables */
|
||||||
} STableBlockInfo;
|
} STableBlockInfo;
|
||||||
|
|
||||||
|
typedef struct SBlockOrderSupporter {
|
||||||
|
int32_t numOfTables;
|
||||||
|
STableBlockInfo** pDataBlockInfo;
|
||||||
|
int32_t* blockIndexArray;
|
||||||
|
int32_t* numOfBlocksPerMeter;
|
||||||
|
} SBlockOrderSupporter;
|
||||||
|
|
||||||
typedef struct STsdbQueryHandle {
|
typedef struct STsdbQueryHandle {
|
||||||
STsdbRepo* pTsdb;
|
STsdbRepo* pTsdb;
|
||||||
SQueryFilePos cur; // current position
|
SQueryFilePos cur; // current position
|
||||||
|
@ -147,6 +154,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
||||||
|
|
||||||
pQueryHandle->loadDataAfterSeek = false;
|
pQueryHandle->loadDataAfterSeek = false;
|
||||||
pQueryHandle->isFirstSlot = true;
|
pQueryHandle->isFirstSlot = true;
|
||||||
|
pQueryHandle->cur.fid = -1;
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(idList);
|
size_t size = taosArrayGetSize(idList);
|
||||||
assert(size >= 1);
|
assert(size >= 1);
|
||||||
|
@ -176,7 +184,7 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
||||||
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
|
* For ascending timestamp order query, query starts from data files. In contrast, buffer will be checked in the first place
|
||||||
* in case of descending timestamp order query.
|
* in case of descending timestamp order query.
|
||||||
*/
|
*/
|
||||||
pQueryHandle->checkFiles = QUERY_IS_ASC_QUERY(pQueryHandle->order);
|
pQueryHandle->checkFiles = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order);
|
||||||
pQueryHandle->activeIndex = 0;
|
pQueryHandle->activeIndex = 0;
|
||||||
|
|
||||||
// allocate buffer in order to load data blocks from file
|
// allocate buffer in order to load data blocks from file
|
||||||
|
@ -201,19 +209,44 @@ tsdb_query_handle_t* tsdbQueryByTableId(tsdb_repo_t* tsdb, STsdbQueryCond* pCond
|
||||||
|
|
||||||
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
|
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
|
||||||
|
pHandle->cur.fid = -1;
|
||||||
|
|
||||||
|
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||||
|
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
STable* pTable = pTableCheckInfo->pTableObj;
|
|
||||||
assert(pTable != NULL);
|
assert(pTable != NULL);
|
||||||
|
|
||||||
// no data in cache, abort
|
// no data in cache, abort
|
||||||
if (pTable->mem == NULL && pTable->imem == NULL) {
|
if (pTable->mem == NULL && pTable->imem == NULL) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pCheckInfo->iter == NULL) {
|
||||||
|
pCheckInfo->iter = tSkipListCreateIterFromVal(pTable->mem->pData, (const char*) &pCheckInfo->lastKey,
|
||||||
|
TSDB_DATA_TYPE_TIMESTAMP, pHandle->order);
|
||||||
|
|
||||||
|
if (pCheckInfo->iter == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!tSkipListIterNext(pCheckInfo->iter)) { // buffer is empty
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListNode* node = tSkipListIterGet(pCheckInfo->iter);
|
||||||
|
if (node == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
|
pCheckInfo->lastKey = dataRowKey(row); // first timestamp in buffer
|
||||||
|
dTrace("%p uid:%" PRId64", tid:%d check data in buffer from skey:%" PRId64 ", order:%d", pHandle,
|
||||||
|
pCheckInfo->tableId.uid, pCheckInfo->tableId.tid, pCheckInfo->lastKey, pHandle->order);
|
||||||
|
|
||||||
// all data in mem are checked already.
|
// all data in mem are checked already.
|
||||||
if (pTableCheckInfo->lastKey > pTable->mem->keyLast) {
|
if ((pCheckInfo->lastKey > pHandle->window.ekey && ASCENDING_ORDER_TRAVERSE(pHandle->order)) ||
|
||||||
|
(pCheckInfo->lastKey < pHandle->window.ekey && !ASCENDING_ORDER_TRAVERSE(pHandle->order))) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -222,7 +255,7 @@ static bool hasMoreDataInCache(STsdbQueryHandle* pHandle) {
|
||||||
|
|
||||||
// todo dynamic get the daysperfile
|
// todo dynamic get the daysperfile
|
||||||
static int32_t getFileIdFromKey(TSKEY key) {
|
static int32_t getFileIdFromKey(TSKEY key) {
|
||||||
int64_t fid = (int64_t)(key / 10); // set the starting fileId
|
int64_t fid = (int64_t)(key / (10 * tsMsPerDay[0])); // set the starting fileId
|
||||||
if (fid > INT32_MAX) {
|
if (fid > INT32_MAX) {
|
||||||
fid = INT32_MAX;
|
fid = INT32_MAX;
|
||||||
}
|
}
|
||||||
|
@ -230,7 +263,32 @@ static int32_t getFileIdFromKey(TSKEY key) {
|
||||||
return fid;
|
return fid;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order);
|
static int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
|
||||||
|
int32_t firstSlot = 0;
|
||||||
|
int32_t lastSlot = numOfBlocks - 1;
|
||||||
|
|
||||||
|
int32_t midSlot = firstSlot;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
numOfBlocks = lastSlot - firstSlot + 1;
|
||||||
|
midSlot = (firstSlot + (numOfBlocks >> 1));
|
||||||
|
|
||||||
|
if (numOfBlocks == 1) break;
|
||||||
|
|
||||||
|
if (skey > pBlock[midSlot].keyLast) {
|
||||||
|
if (numOfBlocks == 2) break;
|
||||||
|
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
|
||||||
|
firstSlot = midSlot + 1;
|
||||||
|
} else if (skey < pBlock[midSlot].keyFirst) {
|
||||||
|
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
|
||||||
|
lastSlot = midSlot - 1;
|
||||||
|
} else {
|
||||||
|
break; // got the slot
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return midSlot;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
|
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
|
||||||
// todo check open file failed
|
// todo check open file failed
|
||||||
|
@ -299,33 +357,6 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t binarySearchForBlockImpl(SCompBlock* pBlock, int32_t numOfBlocks, TSKEY skey, int32_t order) {
|
|
||||||
int32_t firstSlot = 0;
|
|
||||||
int32_t lastSlot = numOfBlocks - 1;
|
|
||||||
|
|
||||||
int32_t midSlot = firstSlot;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
numOfBlocks = lastSlot - firstSlot + 1;
|
|
||||||
midSlot = (firstSlot + (numOfBlocks >> 1));
|
|
||||||
|
|
||||||
if (numOfBlocks == 1) break;
|
|
||||||
|
|
||||||
if (skey > pBlock[midSlot].keyLast) {
|
|
||||||
if (numOfBlocks == 2) break;
|
|
||||||
if ((order == TSDB_ORDER_DESC) && (skey < pBlock[midSlot + 1].keyFirst)) break;
|
|
||||||
firstSlot = midSlot + 1;
|
|
||||||
} else if (skey < pBlock[midSlot].keyFirst) {
|
|
||||||
if ((order == TSDB_ORDER_ASC) && (skey > pBlock[midSlot - 1].keyLast)) break;
|
|
||||||
lastSlot = midSlot - 1;
|
|
||||||
} else {
|
|
||||||
break; // got the slot
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return midSlot;
|
|
||||||
}
|
|
||||||
|
|
||||||
static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||||
SDataBlockInfo info = {
|
SDataBlockInfo info = {
|
||||||
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast},
|
.window = {.skey = pBlock->keyFirst, .ekey = pBlock->keyLast},
|
||||||
|
@ -338,7 +369,34 @@ static SDataBlockInfo getTrueDataBlockInfo(STableCheckInfo* pCheckInfo, SCompBlo
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS);
|
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
||||||
|
assert(numOfCols <= TSDB_MAX_COLUMNS);
|
||||||
|
|
||||||
|
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
taosArrayPush(pIdList, &pCol->info.colId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pIdList;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
|
||||||
|
SArray* pLocalIdList = getColumnIdList(pQueryHandle);
|
||||||
|
|
||||||
|
// check if the primary time stamp column needs to load
|
||||||
|
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
||||||
|
|
||||||
|
// the primary timestamp column does not be included in the the specified load column list, add it
|
||||||
|
if (loadTS && colId != 0) {
|
||||||
|
int16_t columnId = 0;
|
||||||
|
taosArrayInsert(pLocalIdList, 0, &columnId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return pLocalIdList;
|
||||||
|
}
|
||||||
|
|
||||||
static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
||||||
SArray* sa);
|
SArray* sa);
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
|
@ -386,9 +444,9 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
SArray* sa = getDefaultLoadColumns(pQueryHandle, true);
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
// query ended in current block
|
// query ended in current block
|
||||||
if (pQueryHandle->window.ekey < pBlock->keyLast) {
|
if (pQueryHandle->window.ekey < pBlock->keyLast || pCheckInfo->lastKey > pBlock->keyFirst) {
|
||||||
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
|
if (!doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -430,73 +488,75 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
return pQueryHandle->realNumOfRows > 0;
|
return pQueryHandle->realNumOfRows > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool moveToNextBlock(STsdbQueryHandle* pQueryHandle, int32_t step) {
|
//bool moveToNextBlock(STsdbQueryHandle* pQueryHandle, int32_t step) {
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
// SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
//
|
||||||
|
// if (pQueryHandle->cur.fid >= 0) {
|
||||||
|
// /*
|
||||||
|
// * 1. ascending order. The last data block of data file
|
||||||
|
// * 2. descending order. The first block of file
|
||||||
|
// */
|
||||||
|
// STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
||||||
|
// int32_t tid = pCheckInfo->tableId.tid;
|
||||||
|
//
|
||||||
|
// if ((step == QUERY_ASC_FORWARD_STEP &&
|
||||||
|
// (pQueryHandle->cur.slot == pQueryHandle->compIndex[tid].numOfSuperBlocks - 1)) ||
|
||||||
|
// (step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) {
|
||||||
|
// // temporarily keep the position value, in case of no data qualified when move forwards(backwards)
|
||||||
|
// // SQueryFilePos save = pQueryHandle->cur;
|
||||||
|
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
||||||
|
//
|
||||||
|
// int32_t fid = -1;
|
||||||
|
// int32_t numOfBlocks = 0;
|
||||||
|
//
|
||||||
|
// if (pQueryHandle->pFileGroup != NULL) {
|
||||||
|
// if ((fid = getFileCompInfo(pQueryHandle, &numOfBlocks, 1)) < 0) {
|
||||||
|
// } else {
|
||||||
|
// cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1;
|
||||||
|
// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1;
|
||||||
|
//
|
||||||
|
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
||||||
|
// cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||||
|
// assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
|
||||||
|
//
|
||||||
|
// if (pBlock->keyFirst > pQueryHandle->window.ekey) { // done
|
||||||
|
// return false;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||||
|
// }
|
||||||
|
// } else { // check data in cache
|
||||||
|
// pQueryHandle->cur.fid = -1;
|
||||||
|
// return hasMoreDataInCache(pQueryHandle);
|
||||||
|
// }
|
||||||
|
// } else { // next block in the same file
|
||||||
|
// cur->slot += step;
|
||||||
|
//
|
||||||
|
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
||||||
|
// cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
|
||||||
|
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||||
|
// }
|
||||||
|
// } else { // data in cache
|
||||||
|
// return hasMoreDataInCache(pQueryHandle);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// return false;
|
||||||
|
//}
|
||||||
|
|
||||||
if (pQueryHandle->cur.fid >= 0) {
|
static int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
/*
|
|
||||||
* 1. ascending order. The last data block of data file
|
|
||||||
* 2. descending order. The first block of file
|
|
||||||
*/
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
|
|
||||||
int32_t tid = pCheckInfo->tableId.tid;
|
|
||||||
|
|
||||||
if ((step == QUERY_ASC_FORWARD_STEP &&
|
|
||||||
(pQueryHandle->cur.slot == pQueryHandle->compIndex[tid].numOfSuperBlocks - 1)) ||
|
|
||||||
(step == QUERY_DESC_FORWARD_STEP && (pQueryHandle->cur.slot == 0))) {
|
|
||||||
// temporarily keep the position value, in case of no data qualified when move forwards(backwards)
|
|
||||||
// SQueryFilePos save = pQueryHandle->cur;
|
|
||||||
pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
|
||||||
|
|
||||||
int32_t fid = -1;
|
|
||||||
int32_t numOfBlocks = 0;
|
|
||||||
|
|
||||||
if (pQueryHandle->pFileGroup != NULL) {
|
|
||||||
if ((fid = getFileCompInfo(pQueryHandle, &numOfBlocks, 1)) < 0) {
|
|
||||||
} else {
|
|
||||||
cur->slot = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->numOfBlocks - 1;
|
|
||||||
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pQueryHandle->pBlock[cur->slot].numOfPoints - 1;
|
|
||||||
|
|
||||||
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
|
||||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
|
||||||
assert(cur->pos >= 0 && cur->fid >= 0 && cur->slot >= 0);
|
|
||||||
|
|
||||||
if (pBlock->keyFirst > pQueryHandle->window.ekey) { // done
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
|
||||||
}
|
|
||||||
} else { // check data in cache
|
|
||||||
pQueryHandle->cur.fid = -1;
|
|
||||||
return hasMoreDataInCache(pQueryHandle);
|
|
||||||
}
|
|
||||||
} else { // next block in the same file
|
|
||||||
cur->slot += step;
|
|
||||||
|
|
||||||
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
|
||||||
cur->pos = (step == QUERY_ASC_FORWARD_STEP) ? 0 : pBlock->numOfPoints - 1;
|
|
||||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
|
||||||
}
|
|
||||||
} else { // data in cache
|
|
||||||
return hasMoreDataInCache(pQueryHandle);
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
int vnodeBinarySearchKey(char* pValue, int num, TSKEY key, int order) {
|
|
||||||
int firstPos, lastPos, midPos = -1;
|
int firstPos, lastPos, midPos = -1;
|
||||||
int numOfPoints;
|
int numOfPoints;
|
||||||
TSKEY* keyList;
|
TSKEY* keyList;
|
||||||
|
|
||||||
|
assert(order == TSDB_ORDER_ASC || order == TSDB_ORDER_DESC);
|
||||||
|
|
||||||
if (num <= 0) return -1;
|
if (num <= 0) return -1;
|
||||||
|
|
||||||
keyList = (TSKEY*)pValue;
|
keyList = (TSKEY*)pValue;
|
||||||
firstPos = 0;
|
firstPos = 0;
|
||||||
lastPos = num - 1;
|
lastPos = num - 1;
|
||||||
|
|
||||||
if (order == 0) {
|
if (order == TSDB_ORDER_DESC) {
|
||||||
// find the first position which is smaller than the key
|
// find the first position which is smaller than the key
|
||||||
while (1) {
|
while (1) {
|
||||||
if (key >= keyList[lastPos]) return lastPos;
|
if (key >= keyList[lastPos]) return lastPos;
|
||||||
|
@ -555,24 +615,24 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
||||||
SDataCols* pCols = pCheckInfo->pDataCols;
|
SDataCols* pCols = pCheckInfo->pDataCols;
|
||||||
|
|
||||||
int32_t endPos = cur->pos;
|
int32_t endPos = cur->pos;
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey > blockInfo.window.ekey) {
|
||||||
endPos = blockInfo.rows - 1;
|
endPos = blockInfo.rows - 1;
|
||||||
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
|
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
|
||||||
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
|
pCheckInfo->lastKey = blockInfo.window.ekey + 1;
|
||||||
} else if (!QUERY_IS_ASC_QUERY(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
|
} else if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && pQueryHandle->window.ekey < blockInfo.window.skey) {
|
||||||
endPos = 0;
|
endPos = 0;
|
||||||
pQueryHandle->realNumOfRows = cur->pos + 1;
|
pQueryHandle->realNumOfRows = cur->pos + 1;
|
||||||
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
|
pCheckInfo->lastKey = blockInfo.window.ekey - 1;
|
||||||
} else {
|
} else {
|
||||||
endPos =
|
int32_t order = (pQueryHandle->order == TSDB_ORDER_ASC)? TSDB_ORDER_DESC:TSDB_ORDER_ASC;
|
||||||
vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, pQueryHandle->order);
|
endPos = vnodeBinarySearchKey(pCols->cols[0].pData, pCols->numOfPoints, pQueryHandle->window.ekey, order);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
if (endPos < cur->pos) {
|
if (endPos < cur->pos) {
|
||||||
pQueryHandle->realNumOfRows = 0;
|
pQueryHandle->realNumOfRows = 0;
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = endPos - cur->pos;
|
pQueryHandle->realNumOfRows = endPos - cur->pos + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
|
pCheckInfo->lastKey = ((int64_t*)(pCols->cols[0].pData))[endPos] + 1;
|
||||||
|
@ -581,10 +641,8 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
||||||
pQueryHandle->realNumOfRows = 0;
|
pQueryHandle->realNumOfRows = 0;
|
||||||
return;
|
return;
|
||||||
} else {
|
} else {
|
||||||
pQueryHandle->realNumOfRows = cur->pos - endPos;
|
pQueryHandle->realNumOfRows = cur->pos - endPos + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(0);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -614,34 +672,6 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
|
||||||
cur->pos = endPos;
|
cur->pos = endPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SArray* getColumnIdList(STsdbQueryHandle* pQueryHandle) {
|
|
||||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
|
|
||||||
assert(numOfCols <= TSDB_MAX_COLUMNS);
|
|
||||||
|
|
||||||
SArray* pIdList = taosArrayInit(numOfCols, sizeof(int16_t));
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
|
|
||||||
taosArrayPush(pIdList, &pCol->info.colId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pIdList;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* getDefaultLoadColumns(STsdbQueryHandle* pQueryHandle, bool loadTS) {
|
|
||||||
SArray* pLocalIdList = getColumnIdList(pQueryHandle);
|
|
||||||
|
|
||||||
// check if the primary time stamp column needs to load
|
|
||||||
int16_t colId = *(int16_t*)taosArrayGet(pLocalIdList, 0);
|
|
||||||
|
|
||||||
// the primary timestamp column does not be included in the the specified load column list, add it
|
|
||||||
if (loadTS && colId != 0) {
|
|
||||||
int16_t columnId = 0;
|
|
||||||
taosArrayInsert(pLocalIdList, 0, &columnId);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pLocalIdList;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
int firstPos, lastPos, midPos = -1;
|
int firstPos, lastPos, midPos = -1;
|
||||||
int numOfPoints;
|
int numOfPoints;
|
||||||
|
@ -702,79 +732,72 @@ int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order) {
|
||||||
return midPos;
|
return midPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool getQualifiedDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) {
|
//static bool getQualifiedDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, int32_t type) {
|
||||||
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
// STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
||||||
int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
|
// int32_t fid = getFileIdFromKey(pCheckInfo->lastKey);
|
||||||
|
//
|
||||||
|
// tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD);
|
||||||
|
// tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||||
|
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
||||||
|
//
|
||||||
|
// SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
//
|
||||||
|
// int32_t tid = pCheckInfo->tableId.tid;
|
||||||
|
// int32_t numOfBlocks = 0;
|
||||||
|
//
|
||||||
|
// while (pQueryHandle->pFileGroup != NULL) {
|
||||||
|
// if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// assert(pCheckInfo->numOfBlocks >= 0);
|
||||||
|
//
|
||||||
|
// // no data block in current file, try next
|
||||||
|
// if (pCheckInfo->numOfBlocks > 0) {
|
||||||
|
// cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||||
|
// break;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// dTrace("%p no data block in file, fid:%d, tid:%d, try next, %p", pQueryHandle, pQueryHandle->pFileGroup->fileId,
|
||||||
|
// tid, pQueryHandle->qinfo);
|
||||||
|
//
|
||||||
|
// pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// if (pCheckInfo->numOfBlocks == 0) {
|
||||||
|
// return false;
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// cur->slot = 0; // always start from the first slot
|
||||||
|
// SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
||||||
|
// return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||||
|
//}
|
||||||
|
|
||||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, TSDB_FGROUP_ITER_FORWARD);
|
//static UNUSED_FUNC bool hasMoreDataForSingleTable(STsdbQueryHandle* pHandle) {
|
||||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
// assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
|
||||||
pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
//
|
||||||
|
// STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
|
||||||
|
// STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||||
|
//
|
||||||
|
// if (!pCheckInfo->checkFirstFileBlock) {
|
||||||
|
// pCheckInfo->checkFirstFileBlock = true;
|
||||||
|
//
|
||||||
|
// if (pFileHandle != NULL) {
|
||||||
|
// bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1);
|
||||||
|
// if (found) {
|
||||||
|
// return true;
|
||||||
|
// }
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// // no data in file, try cache
|
||||||
|
// pHandle->cur.fid = -1;
|
||||||
|
// return hasMoreDataInCache(pHandle);
|
||||||
|
// } else { // move to next data block in file or in cache
|
||||||
|
// return moveToNextBlock(pHandle, 1);
|
||||||
|
// }
|
||||||
|
//}
|
||||||
|
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
static void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
|
||||||
|
|
||||||
int32_t tid = pCheckInfo->tableId.tid;
|
|
||||||
int32_t numOfBlocks = 0;
|
|
||||||
|
|
||||||
while (pQueryHandle->pFileGroup != NULL) {
|
|
||||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(pCheckInfo->numOfBlocks >= 0);
|
|
||||||
|
|
||||||
// no data block in current file, try next
|
|
||||||
if (pCheckInfo->numOfBlocks > 0) {
|
|
||||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
dTrace("%p no data block in file, fid:%d, tid:%d, try next, %p", pQueryHandle, pQueryHandle->pFileGroup->fileId,
|
|
||||||
tid, pQueryHandle->qinfo);
|
|
||||||
|
|
||||||
pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pCheckInfo->numOfBlocks == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cur->slot = 0; // always start from the first slot
|
|
||||||
SCompBlock* pBlock = &pCheckInfo->pCompInfo->blocks[cur->slot];
|
|
||||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
static UNUSED_FUNC bool hasMoreDataForSingleTable(STsdbQueryHandle* pHandle) {
|
|
||||||
assert(pHandle->activeIndex == 0 && taosArrayGetSize(pHandle->pTableCheckInfo) == 1);
|
|
||||||
|
|
||||||
STsdbFileH* pFileHandle = tsdbGetFile(pHandle->pTsdb);
|
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
|
||||||
|
|
||||||
if (!pCheckInfo->checkFirstFileBlock) {
|
|
||||||
pCheckInfo->checkFirstFileBlock = true;
|
|
||||||
|
|
||||||
if (pFileHandle != NULL) {
|
|
||||||
bool found = getQualifiedDataBlock(pHandle, pCheckInfo, 1);
|
|
||||||
if (found) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data in file, try cache
|
|
||||||
pHandle->cur.fid = -1;
|
|
||||||
return hasMoreDataInCache(pHandle);
|
|
||||||
} else { // move to next data block in file or in cache
|
|
||||||
return moveToNextBlock(pHandle, 1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct SBlockOrderSupporter {
|
|
||||||
int32_t numOfTables;
|
|
||||||
STableBlockInfo** pDataBlockInfo;
|
|
||||||
int32_t* blockIndexArray;
|
|
||||||
int32_t* numOfBlocksPerMeter;
|
|
||||||
} SBlockOrderSupporter;
|
|
||||||
|
|
||||||
void cleanBlockOrderSupporter(SBlockOrderSupporter* pSupporter, int32_t numOfTables) {
|
|
||||||
tfree(pSupporter->numOfBlocksPerMeter);
|
tfree(pSupporter->numOfBlocksPerMeter);
|
||||||
tfree(pSupporter->blockIndexArray);
|
tfree(pSupporter->blockIndexArray);
|
||||||
|
|
||||||
|
@ -815,7 +838,7 @@ static int32_t dataBlockOrderCompar(const void* pLeft, const void* pRight, void*
|
||||||
return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1;
|
return pLeftBlockInfoEx->pBlock.compBlock->offset > pRightBlockInfoEx->pBlock.compBlock->offset ? 1 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
|
static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks, int32_t* numOfAllocBlocks) {
|
||||||
char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks);
|
char* tmp = realloc(pQueryHandle->pDataBlockInfo, sizeof(STableBlockInfo) * numOfBlocks);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||||
|
@ -912,6 +935,53 @@ int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numOfBlocks
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo opt for only one table case
|
||||||
|
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
pQueryHandle->numOfBlocks = 0;
|
||||||
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
int32_t numOfBlocks = 0;
|
||||||
|
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
||||||
|
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
||||||
|
int32_t type = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
|
||||||
|
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(numOfBlocks >= 0);
|
||||||
|
dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
|
||||||
|
numOfTables, pQueryHandle->pFileGroup->fileId);
|
||||||
|
|
||||||
|
// todo return error code to query engine
|
||||||
|
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(numOfBlocks >= pQueryHandle->numOfBlocks);
|
||||||
|
if (pQueryHandle->numOfBlocks > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// no data in file anymore
|
||||||
|
if (pQueryHandle->numOfBlocks <= 0) {
|
||||||
|
assert(pQueryHandle->pFileGroup == NULL);
|
||||||
|
cur->fid = -1;
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
cur->slot = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
||||||
|
cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||||
|
|
||||||
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||||
|
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
|
||||||
|
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
|
||||||
|
|
||||||
|
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||||
|
}
|
||||||
|
|
||||||
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
@ -925,98 +995,18 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
||||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||||
|
|
||||||
int32_t numOfBlocks = -1;
|
return getDataBlocksInFilesImpl(pQueryHandle);
|
||||||
|
|
||||||
// todo opt for only one table case
|
|
||||||
pQueryHandle->numOfBlocks = 0;
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
|
||||||
|
|
||||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
|
||||||
int32_t type = QUERY_IS_ASC_QUERY(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
|
|
||||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfBlocks >= 0);
|
|
||||||
dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks,
|
|
||||||
numOfTables, pQueryHandle->pFileGroup->fileId);
|
|
||||||
|
|
||||||
// todo return error code to query engine
|
|
||||||
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfBlocks >= pQueryHandle->numOfBlocks);
|
|
||||||
if (pQueryHandle->numOfBlocks > 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data in file anymore
|
|
||||||
if (pQueryHandle->numOfBlocks <= 0) {
|
|
||||||
assert(pQueryHandle->pFileGroup == NULL);
|
|
||||||
cur->fid = -1;
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
|
||||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
|
||||||
|
|
||||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
||||||
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
|
|
||||||
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
|
|
||||||
|
|
||||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
|
||||||
} else {
|
} else {
|
||||||
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && QUERY_IS_ASC_QUERY(pQueryHandle->order)) ||
|
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
|
||||||
(cur->slot == 0 && !QUERY_IS_ASC_QUERY(pQueryHandle->order))) { // all blocks
|
(cur->slot == 0 && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) { // all blocks
|
||||||
int32_t numOfBlocks = -1;
|
|
||||||
|
|
||||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
return getDataBlocksInFilesImpl(pQueryHandle);
|
||||||
pQueryHandle->numOfBlocks = 0;
|
|
||||||
|
|
||||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
|
||||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, 1) != TSDB_CODE_SUCCESS) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfBlocks >= 0);
|
|
||||||
dTrace("%p %d blocks found in file for %d table(s), fid:%d", pQueryHandle, numOfBlocks, numOfTables,
|
|
||||||
pQueryHandle->pFileGroup->fileId);
|
|
||||||
|
|
||||||
// todo return error code to query engine
|
|
||||||
if (createDataBlocksInfo(pQueryHandle, numOfBlocks, &pQueryHandle->numOfBlocks) != TSDB_CODE_SUCCESS) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
assert(numOfBlocks >= pQueryHandle->numOfBlocks);
|
|
||||||
if (pQueryHandle->numOfBlocks > 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// no data in file anymore
|
|
||||||
if (pQueryHandle->numOfBlocks <= 0) {
|
|
||||||
assert(pQueryHandle->pFileGroup == NULL);
|
|
||||||
cur->fid = -1;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
cur->slot = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
|
||||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
|
||||||
|
|
||||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
|
||||||
STableCheckInfo* pCheckInfo = pBlockInfo->pTableCheckInfo;
|
|
||||||
SCompBlock* pBlock = pBlockInfo->pBlock.compBlock;
|
|
||||||
|
|
||||||
return loadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
|
||||||
} else { // next block of the same file
|
} else { // next block of the same file
|
||||||
int32_t step = QUERY_IS_ASC_QUERY(pQueryHandle->order)? 1:-1;
|
int32_t step = ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||||
cur->slot += step;
|
cur->slot += step;
|
||||||
|
|
||||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
cur->pos = 0;
|
cur->pos = 0;
|
||||||
} else {
|
} else {
|
||||||
cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
|
cur->pos = pBlockInfo->pBlock.compBlock->numOfPoints - 1;
|
||||||
|
@ -1027,11 +1017,9 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
|
// todo add assert, the value of numOfTables should be less than the maximum value for each vnode capacity
|
||||||
assert(numOfTables > 0);
|
|
||||||
|
|
||||||
while (pQueryHandle->activeIndex < numOfTables) {
|
while (pQueryHandle->activeIndex < numOfTables) {
|
||||||
if (hasMoreDataInCache(pQueryHandle)) {
|
if (hasMoreDataInCache(pQueryHandle)) {
|
||||||
|
@ -1051,7 +1039,7 @@ bool tsdbNextDataBlock(tsdb_query_handle_t* pqHandle) {
|
||||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
assert(numOfTables > 0);
|
assert(numOfTables > 0);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQueryHandle->order)) {
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
if (pQueryHandle->checkFiles) {
|
if (pQueryHandle->checkFiles) {
|
||||||
if (getDataBlocksInFiles(pQueryHandle)) {
|
if (getDataBlocksInFiles(pQueryHandle)) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -1082,12 +1070,23 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
*skey = INT64_MIN;
|
*skey = INT64_MIN;
|
||||||
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
do {
|
||||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
if (node == NULL) break;
|
if (node == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
if (dataRowKey(row) > maxKey) break;
|
TSKEY key = dataRowKey(row);
|
||||||
|
|
||||||
|
if ((key > maxKey && ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) ||
|
||||||
|
(key < maxKey && !ASCENDING_ORDER_TRAVERSE(pQueryHandle->order))) {
|
||||||
|
|
||||||
|
dTrace("%p key:%"PRIu64" beyond qrange:%"PRId64" - %"PRId64", no more data in buffer", pQueryHandle, key, pQueryHandle->window.skey,
|
||||||
|
pQueryHandle->window.ekey);
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (*skey == INT64_MIN) {
|
if (*skey == INT64_MIN) {
|
||||||
*skey = dataRowKey(row);
|
*skey = dataRowKey(row);
|
||||||
|
@ -1096,16 +1095,40 @@ static int tsdbReadRowsFromCache(SSkipListIterator* pIter, TSKEY maxKey, int max
|
||||||
*ekey = dataRowKey(row);
|
*ekey = dataRowKey(row);
|
||||||
|
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
|
char* pData = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
memcpy(pColInfo->pData + numOfRows * pColInfo->info.bytes, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
|
||||||
|
if (ASCENDING_ORDER_TRAVERSE(pQueryHandle->order)) {
|
||||||
|
pData = pColInfo->pData + numOfRows * pColInfo->info.bytes;
|
||||||
|
} else {
|
||||||
|
pData = pColInfo->pData + (maxRowsToRead - numOfRows - 1) * pColInfo->info.bytes;
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(pData, dataRowTuple(row) + offset, pColInfo->info.bytes);
|
||||||
offset += pColInfo->info.bytes;
|
offset += pColInfo->info.bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
if (numOfRows >= maxRowsToRead) break;
|
if (numOfRows >= maxRowsToRead) {
|
||||||
};
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
} while(tSkipListIterNext(pIter));
|
||||||
|
|
||||||
|
assert(numOfRows <= maxRowsToRead);
|
||||||
|
|
||||||
|
// if the buffer is not full in case of descending order query, move the data in the front of the buffer
|
||||||
|
if (!ASCENDING_ORDER_TRAVERSE(pQueryHandle->order) && numOfRows < maxRowsToRead) {
|
||||||
|
int32_t emptySize = maxRowsToRead - numOfRows;
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
|
||||||
|
memmove(pColInfo->pData, pColInfo->pData + emptySize * pColInfo->info.bytes, numOfRows * pColInfo->info.bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1118,6 +1141,8 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
|
||||||
TSKEY skey = 0, ekey = 0;
|
TSKEY skey = 0, ekey = 0;
|
||||||
int32_t rows = 0;
|
int32_t rows = 0;
|
||||||
|
|
||||||
|
int32_t step = ASCENDING_ORDER_TRAVERSE(pHandle->order)? 1:-1;
|
||||||
|
|
||||||
// data in file
|
// data in file
|
||||||
if (pHandle->cur.fid >= 0) {
|
if (pHandle->cur.fid >= 0) {
|
||||||
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
|
STableBlockInfo* pBlockInfo = &pHandle->pDataBlockInfo[pHandle->cur.slot];
|
||||||
|
@ -1139,7 +1164,7 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
|
||||||
ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
|
ekey = *(TSKEY*)((char*)pColInfoEx->pData + TSDB_KEYSIZE * (rows - 1));
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
pBlockInfo->pTableCheckInfo->lastKey = ekey + 1;
|
pBlockInfo->pTableCheckInfo->lastKey = ekey + step;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
STableCheckInfo* pCheckInfo = taosArrayGet(pHandle->pTableCheckInfo, pHandle->activeIndex);
|
||||||
|
@ -1147,18 +1172,20 @@ SDataBlockInfo tsdbRetrieveDataBlockInfo(tsdb_query_handle_t* pQueryHandle) {
|
||||||
|
|
||||||
if (pTable->mem != NULL) {
|
if (pTable->mem != NULL) {
|
||||||
// create mem table iterator if it is not created yet
|
// create mem table iterator if it is not created yet
|
||||||
if (pCheckInfo->iter == NULL) {
|
assert(pCheckInfo->iter != NULL);
|
||||||
pCheckInfo->iter = tSkipListCreateIter(pTable->mem->pData);
|
rows = tsdbReadRowsFromCache(pCheckInfo->iter, pHandle->window.ekey, 2, &skey, &ekey, pHandle);
|
||||||
}
|
|
||||||
rows = tsdbReadRowsFromCache(pCheckInfo->iter, INT64_MAX, 2, &skey, &ekey, pHandle);
|
|
||||||
|
|
||||||
// update the last key value
|
// update the last key value
|
||||||
pCheckInfo->lastKey = ekey + 1;
|
pCheckInfo->lastKey = ekey + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SDataBlockInfo blockInfo = {
|
SDataBlockInfo blockInfo = {
|
||||||
.uid = pTable->tableId.uid, .sid = pTable->tableId.tid, .rows = rows, .window = {.skey = skey, .ekey = ekey}};
|
.uid = pTable->tableId.uid,
|
||||||
|
.sid = pTable->tableId.tid,
|
||||||
|
.rows = rows,
|
||||||
|
.window = {.skey = MIN(skey, ekey), .ekey = MAX(skey, ekey)}
|
||||||
|
};
|
||||||
|
|
||||||
return blockInfo;
|
return blockInfo;
|
||||||
}
|
}
|
||||||
|
@ -1259,7 +1286,7 @@ static void convertQueryResult(SArray* pRes, SArray* pTableList) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyHelper(void* param) {
|
static void destroyHelper(void* param) {
|
||||||
if (param == NULL) {
|
if (param == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1269,86 +1296,6 @@ void destroyHelper(void* param) {
|
||||||
free(param);
|
free(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
static UNUSED_FUNC int32_t compareStrPatternComp(const void* pLeft, const void* pRight) {
|
|
||||||
SPatternCompareInfo pInfo = {'%', '_'};
|
|
||||||
|
|
||||||
const char* pattern = pRight;
|
|
||||||
const char* str = pLeft;
|
|
||||||
|
|
||||||
int32_t ret = patternMatch(pattern, str, strlen(str), &pInfo);
|
|
||||||
|
|
||||||
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static UNUSED_FUNC int32_t compareWStrPatternComp(const void* pLeft, const void* pRight) {
|
|
||||||
SPatternCompareInfo pInfo = {'%', '_'};
|
|
||||||
|
|
||||||
const wchar_t* pattern = pRight;
|
|
||||||
const wchar_t* str = pLeft;
|
|
||||||
|
|
||||||
int32_t ret = WCSPatternMatch(pattern, str, wcslen(str), &pInfo);
|
|
||||||
|
|
||||||
return (ret == TSDB_PATTERN_MATCH) ? 0 : 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// static __compar_fn_t getFilterComparator(int32_t type, int32_t filterType, int32_t optr) {
|
|
||||||
// __compar_fn_t comparator = NULL;
|
|
||||||
//
|
|
||||||
// switch (type) {
|
|
||||||
// case TSDB_DATA_TYPE_TINYINT:
|
|
||||||
// case TSDB_DATA_TYPE_SMALLINT:
|
|
||||||
// case TSDB_DATA_TYPE_INT:
|
|
||||||
// case TSDB_DATA_TYPE_BIGINT:
|
|
||||||
// case TSDB_DATA_TYPE_BOOL: {
|
|
||||||
// if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
// comparator = compareIntVal;
|
|
||||||
// } else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
// comparator = compareIntDoubleVal;
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_FLOAT:
|
|
||||||
// case TSDB_DATA_TYPE_DOUBLE: {
|
|
||||||
// if (filterType >= TSDB_DATA_TYPE_BOOL && filterType <= TSDB_DATA_TYPE_BIGINT) {
|
|
||||||
// comparator = compareDoubleIntVal;
|
|
||||||
// } else if (filterType >= TSDB_DATA_TYPE_FLOAT && filterType <= TSDB_DATA_TYPE_DOUBLE) {
|
|
||||||
// comparator = compareDoubleVal;
|
|
||||||
// }
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_BINARY: {
|
|
||||||
// assert(filterType == TSDB_DATA_TYPE_BINARY);
|
|
||||||
//
|
|
||||||
// if (optr == TSDB_RELATION_LIKE) { /* wildcard query using like operator */
|
|
||||||
// comparator = compareStrPatternComp;
|
|
||||||
// } else { /* normal relational comparator */
|
|
||||||
// comparator = compareStrVal;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// case TSDB_DATA_TYPE_NCHAR: {
|
|
||||||
// assert(filterType == TSDB_DATA_TYPE_NCHAR);
|
|
||||||
//
|
|
||||||
// if (optr == TSDB_RELATION_LIKE) {
|
|
||||||
// comparator = compareWStrPatternComp;
|
|
||||||
// } else {
|
|
||||||
// comparator = compareWStrVal;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// default:
|
|
||||||
// comparator = compareIntVal;
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
//
|
|
||||||
// return comparator;
|
|
||||||
//}
|
|
||||||
|
|
||||||
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) {
|
static void getTagColumnInfo(SExprTreeSupporter* pSupporter, SSchema* pSchema, int32_t* index, int32_t* offset) {
|
||||||
*index = 0;
|
*index = 0;
|
||||||
*offset = 0;
|
*offset = 0;
|
||||||
|
@ -1494,7 +1441,10 @@ int32_t tsdbQueryTags(tsdb_repo_t* tsdb, int64_t uid, const char* pTagCond, size
|
||||||
|
|
||||||
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
|
||||||
|
if (pQueryHandle == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
STableCheckInfo* pTableCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
|
||||||
|
|
Loading…
Reference in New Issue