Merge branch 'develop' into compress_float
This commit is contained in:
commit
c349702396
|
@ -22,7 +22,6 @@ for:
|
||||||
- call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" %ARCH%
|
- call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" %ARCH%
|
||||||
|
|
||||||
before_build:
|
before_build:
|
||||||
- choco install lua
|
|
||||||
- cd c:\dev\TDengine
|
- cd c:\dev\TDengine
|
||||||
- md build
|
- md build
|
||||||
|
|
||||||
|
@ -36,9 +35,6 @@ for:
|
||||||
- image: macos
|
- image: macos
|
||||||
clone_depth: 1
|
clone_depth: 1
|
||||||
|
|
||||||
before_build:
|
|
||||||
- brew install lua
|
|
||||||
|
|
||||||
build_script:
|
build_script:
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
|
|
@ -2005,7 +2005,6 @@ static SVgroupsInfo* createVgroupInfoFromMsg(char* pMsg, int32_t* size, uint64_t
|
||||||
|
|
||||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||||
vmsg->vgId = htonl(vmsg->vgId);
|
vmsg->vgId = htonl(vmsg->vgId);
|
||||||
vmsg->numOfEps = vmsg->numOfEps;
|
|
||||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||||
}
|
}
|
||||||
|
|
|
@ -634,7 +634,9 @@ static void setResRawPtrImpl(SSqlRes* pRes, SInternalField* pInfo, int32_t i, bo
|
||||||
|
|
||||||
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo) {
|
||||||
assert(pRes->numOfCols > 0);
|
assert(pRes->numOfCols > 0);
|
||||||
|
if (pRes->numOfRows == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) {
|
||||||
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
|
SInternalField* pInfo = (SInternalField*)TARRAY_GET_ELEM(pQueryInfo->fieldsInfo.internalField, i);
|
||||||
|
@ -746,6 +748,37 @@ typedef struct SJoinOperatorInfo {
|
||||||
SRspResultInfo resultInfo; // todo refactor, add this info for each operator
|
SRspResultInfo resultInfo; // todo refactor, add this info for each operator
|
||||||
} SJoinOperatorInfo;
|
} SJoinOperatorInfo;
|
||||||
|
|
||||||
|
static void converNcharFilterColumn(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t rows, bool *gotNchar) {
|
||||||
|
for (int32_t i = 0; i < numOfFilterCols; ++i) {
|
||||||
|
if (pFilterInfo[i].info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
pFilterInfo[i].pData2 = pFilterInfo[i].pData;
|
||||||
|
pFilterInfo[i].pData = malloc(rows * pFilterInfo[i].info.bytes);
|
||||||
|
int32_t bufSize = pFilterInfo[i].info.bytes - VARSTR_HEADER_SIZE;
|
||||||
|
for (int32_t j = 0; j < rows; ++j) {
|
||||||
|
char* dst = (char *)pFilterInfo[i].pData + j * pFilterInfo[i].info.bytes;
|
||||||
|
char* src = (char *)pFilterInfo[i].pData2 + j * pFilterInfo[i].info.bytes;
|
||||||
|
int32_t len = 0;
|
||||||
|
taosMbsToUcs4(varDataVal(src), varDataLen(src), varDataVal(dst), bufSize, &len);
|
||||||
|
varDataLen(dst) = len;
|
||||||
|
}
|
||||||
|
*gotNchar = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void freeNcharFilterColumn(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
|
||||||
|
for (int32_t i = 0; i < numOfFilterCols; ++i) {
|
||||||
|
if (pFilterInfo[i].info.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
|
if (pFilterInfo[i].pData2) {
|
||||||
|
tfree(pFilterInfo[i].pData);
|
||||||
|
pFilterInfo[i].pData = pFilterInfo[i].pData2;
|
||||||
|
pFilterInfo[i].pData2 = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
|
static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols) {
|
||||||
int32_t offset = 0;
|
int32_t offset = 0;
|
||||||
char* pData = pRes->data;
|
char* pData = pRes->data;
|
||||||
|
@ -764,8 +797,13 @@ static void doSetupSDataBlock(SSqlRes* pRes, SSDataBlock* pBlock, SSingleColumnF
|
||||||
// filter data if needed
|
// filter data if needed
|
||||||
if (numOfFilterCols > 0) {
|
if (numOfFilterCols > 0) {
|
||||||
doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock);
|
doSetFilterColumnInfo(pFilterInfo, numOfFilterCols, pBlock);
|
||||||
|
bool gotNchar = false;
|
||||||
|
converNcharFilterColumn(pFilterInfo, numOfFilterCols, pBlock->info.rows, &gotNchar);
|
||||||
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
|
int8_t* p = calloc(pBlock->info.rows, sizeof(int8_t));
|
||||||
bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p);
|
bool all = doFilterDataBlock(pFilterInfo, numOfFilterCols, pBlock->info.rows, p);
|
||||||
|
if (gotNchar) {
|
||||||
|
freeNcharFilterColumn(pFilterInfo, numOfFilterCols);
|
||||||
|
}
|
||||||
if (!all) {
|
if (!all) {
|
||||||
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
|
doCompactSDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,8 +20,8 @@ IF (TD_LINUX_64)
|
||||||
if (CMAKE_C_COMPILER_ID STREQUAL "GNU" AND CMAKE_C_COMPILER_VERSION VERSION_LESS 5.0.0)
|
if (CMAKE_C_COMPILER_ID STREQUAL "GNU" AND CMAKE_C_COMPILER_VERSION VERSION_LESS 5.0.0)
|
||||||
message(WARNING "gcc 4.8.0 will complain too much about flex-generated code, we just bypass building ODBC driver in such case")
|
message(WARNING "gcc 4.8.0 will complain too much about flex-generated code, we just bypass building ODBC driver in such case")
|
||||||
else ()
|
else ()
|
||||||
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -Wconversion")
|
SET(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} ")
|
||||||
SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -Wconversion")
|
SET(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ")
|
||||||
ADD_SUBDIRECTORY(src)
|
ADD_SUBDIRECTORY(src)
|
||||||
ADD_SUBDIRECTORY(tools)
|
ADD_SUBDIRECTORY(tools)
|
||||||
ADD_SUBDIRECTORY(examples)
|
ADD_SUBDIRECTORY(examples)
|
||||||
|
|
|
@ -45,7 +45,7 @@ extern void updateBuffer(Command *cmd);
|
||||||
extern int isReadyGo(Command *cmd);
|
extern int isReadyGo(Command *cmd);
|
||||||
extern void resetCommand(Command *cmd, const char s[]);
|
extern void resetCommand(Command *cmd, const char s[]);
|
||||||
|
|
||||||
int countPrefixOnes(char c);
|
int countPrefixOnes(unsigned char c);
|
||||||
void clearScreen(int ecmd_pos, int cursor_pos);
|
void clearScreen(int ecmd_pos, int cursor_pos);
|
||||||
void printChar(char c, int times);
|
void printChar(char c, int times);
|
||||||
void positionCursor(int step, int direction);
|
void positionCursor(int step, int direction);
|
||||||
|
|
|
@ -26,7 +26,7 @@ typedef struct {
|
||||||
char widthOnScreen;
|
char widthOnScreen;
|
||||||
} UTFCodeInfo;
|
} UTFCodeInfo;
|
||||||
|
|
||||||
int countPrefixOnes(char c) {
|
int countPrefixOnes(unsigned char c) {
|
||||||
unsigned char mask = 127;
|
unsigned char mask = 127;
|
||||||
mask = ~mask;
|
mask = ~mask;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
@ -48,7 +48,7 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) {
|
||||||
while (--pos >= 0) {
|
while (--pos >= 0) {
|
||||||
*size += 1;
|
*size += 1;
|
||||||
|
|
||||||
if (str[pos] > 0 || countPrefixOnes(str[pos]) > 1) break;
|
if (str[pos] > 0 || countPrefixOnes((unsigned char )str[pos]) > 1) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = mbtowc(&wc, str + pos, MB_CUR_MAX);
|
int rc = mbtowc(&wc, str + pos, MB_CUR_MAX);
|
||||||
|
|
|
@ -73,14 +73,14 @@ typedef struct SResultRowPool {
|
||||||
|
|
||||||
typedef struct SResultRow {
|
typedef struct SResultRow {
|
||||||
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
int32_t pageId; // pageId & rowId is the position of current result in disk-based output buffer
|
||||||
int32_t offset:29; // row index in buffer page
|
int32_t offset:29; // row index in buffer page
|
||||||
bool startInterp; // the time window start timestamp has done the interpolation already.
|
bool startInterp; // the time window start timestamp has done the interpolation already.
|
||||||
bool endInterp; // the time window end timestamp has done the interpolation already.
|
bool endInterp; // the time window end timestamp has done the interpolation already.
|
||||||
bool closed; // this result status: closed or opened
|
bool closed; // this result status: closed or opened
|
||||||
uint32_t numOfRows; // number of rows of current time window
|
uint32_t numOfRows; // number of rows of current time window
|
||||||
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
|
SResultRowCellInfo* pCellInfo; // For each result column, there is a resultInfo
|
||||||
STimeWindow win;
|
STimeWindow win;
|
||||||
char* key; // start key of current result row
|
char *key; // start key of current result row
|
||||||
} SResultRow;
|
} SResultRow;
|
||||||
|
|
||||||
typedef struct SGroupResInfo {
|
typedef struct SGroupResInfo {
|
||||||
|
@ -105,7 +105,7 @@ typedef struct SResultRowInfo {
|
||||||
int16_t type:8; // data type for hash key
|
int16_t type:8; // data type for hash key
|
||||||
int32_t size:24; // number of result set
|
int32_t size:24; // number of result set
|
||||||
int32_t capacity; // max capacity
|
int32_t capacity; // max capacity
|
||||||
int32_t curIndex; // current start active index
|
SResultRow* current; // current start active index
|
||||||
int64_t prevSKey; // previous (not completed) sliding window start key
|
int64_t prevSKey; // previous (not completed) sliding window start key
|
||||||
} SResultRowInfo;
|
} SResultRowInfo;
|
||||||
|
|
||||||
|
@ -118,6 +118,7 @@ typedef struct SColumnFilterElem {
|
||||||
|
|
||||||
typedef struct SSingleColumnFilterInfo {
|
typedef struct SSingleColumnFilterInfo {
|
||||||
void* pData;
|
void* pData;
|
||||||
|
void* pData2; //used for nchar column
|
||||||
int32_t numOfFilters;
|
int32_t numOfFilters;
|
||||||
SColumnInfo info;
|
SColumnInfo info;
|
||||||
SColumnFilterElem* pFilters;
|
SColumnFilterElem* pFilters;
|
||||||
|
|
|
@ -410,6 +410,21 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, SQueryRuntim
|
||||||
pResultRowInfo->capacity = (int32_t)newCapacity;
|
pResultRowInfo->capacity = (int32_t)newCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t ascResultRowCompareFn(const void* p1, const void* p2) {
|
||||||
|
SResultRow* pRow1 = *(SResultRow**)p1;
|
||||||
|
SResultRow* pRow2 = *(SResultRow**)p2;
|
||||||
|
|
||||||
|
if (pRow1 == pRow2) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return pRow1->win.skey < pRow2->win.skey? -1:1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t descResultRowCompareFn(const void* p1, const void* p2) {
|
||||||
|
return -ascResultRowCompareFn(p1, p2);
|
||||||
|
}
|
||||||
|
|
||||||
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
|
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
|
||||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||||
bool existed = false;
|
bool existed = false;
|
||||||
|
@ -425,11 +440,17 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) {
|
pResultRowInfo->current = (*p1);
|
||||||
if (pResultRowInfo->pResult[i] == (*p1)) {
|
|
||||||
pResultRowInfo->curIndex = i;
|
if (pResultRowInfo->size == 0) {
|
||||||
|
existed = false;
|
||||||
|
} else if (pResultRowInfo->size == 1) {
|
||||||
|
existed = (pResultRowInfo->pResult[0] == (*p1));
|
||||||
|
} else {
|
||||||
|
__compar_fn_t fn = QUERY_IS_ASC_QUERY(pRuntimeEnv->pQueryAttr)? ascResultRowCompareFn:descResultRowCompareFn;
|
||||||
|
void* ptr = taosbsearch(p1, pResultRowInfo->pResult, pResultRowInfo->size, POINTER_BYTES, fn, TD_EQ);
|
||||||
|
if (ptr != NULL) {
|
||||||
existed = true;
|
existed = true;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -456,8 +477,8 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
pResult = *p1;
|
pResult = *p1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultRowInfo->pResult[pResultRowInfo->size] = pResult;
|
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
||||||
pResultRowInfo->curIndex = pResultRowInfo->size++;
|
pResultRowInfo->current = pResult;
|
||||||
}
|
}
|
||||||
|
|
||||||
// too many time window in query
|
// too many time window in query
|
||||||
|
@ -465,7 +486,7 @@ static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SRes
|
||||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_TOO_MANY_TIMEWINDOW);
|
||||||
}
|
}
|
||||||
|
|
||||||
return getResultRow(pResultRowInfo, pResultRowInfo->curIndex);
|
return pResultRowInfo->current;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
|
static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWindow* w) {
|
||||||
|
@ -496,7 +517,7 @@ static void getInitialStartTimeWindow(SQueryAttr* pQueryAttr, TSKEY ts, STimeWin
|
||||||
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
|
static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
|
||||||
STimeWindow w = {0};
|
STimeWindow w = {0};
|
||||||
|
|
||||||
if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value
|
if (pResultRowInfo->current == NULL) { // the first window, from the previous stored value
|
||||||
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||||
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
||||||
pResultRowInfo->prevSKey = w.skey;
|
pResultRowInfo->prevSKey = w.skey;
|
||||||
|
@ -510,8 +531,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
||||||
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t slot = curTimeWindowIndex(pResultRowInfo);
|
// int32_t slot = curTimeWindowIndex(pResultRowInfo);
|
||||||
SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
|
// SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
|
||||||
|
SResultRow* pWindowRes = pResultRowInfo->current;
|
||||||
w = pWindowRes->win;
|
w = pWindowRes->win;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -697,7 +719,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
||||||
|
|
||||||
// all result rows are closed, set the last one to be the skey
|
// all result rows are closed, set the last one to be the skey
|
||||||
if (skey == TSKEY_INITIAL_VAL) {
|
if (skey == TSKEY_INITIAL_VAL) {
|
||||||
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
|
if (pResultRowInfo->size == 0) {
|
||||||
|
// assert(pResultRowInfo->current == NULL);
|
||||||
|
pResultRowInfo->current = NULL;
|
||||||
|
} else {
|
||||||
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
||||||
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
for (i = pResultRowInfo->size - 1; i >= 0; --i) {
|
||||||
|
@ -708,12 +735,12 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (i == pResultRowInfo->size - 1) {
|
if (i == pResultRowInfo->size - 1) {
|
||||||
pResultRowInfo->curIndex = i;
|
pResultRowInfo->current = pResultRowInfo->pResult[i];
|
||||||
} else {
|
} else {
|
||||||
pResultRowInfo->curIndex = i + 1; // current not closed result object
|
pResultRowInfo->current = pResultRowInfo->pResult[i + 1]; // current not closed result object
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
|
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -721,7 +748,7 @@ static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQuer
|
||||||
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||||
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
|
if ((lastKey > pQueryAttr->window.ekey && ascQuery) || (lastKey < pQueryAttr->window.ekey && (!ascQuery))) {
|
||||||
closeAllResultRows(pResultRowInfo);
|
closeAllResultRows(pResultRowInfo);
|
||||||
pResultRowInfo->curIndex = pResultRowInfo->size - 1;
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
||||||
} else {
|
} else {
|
||||||
int32_t step = ascQuery ? 1 : -1;
|
int32_t step = ascQuery ? 1 : -1;
|
||||||
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo);
|
doUpdateResultRowIndex(pResultRowInfo, lastKey - step, ascQuery, pQueryAttr->timeWindowInterpo);
|
||||||
|
@ -1230,7 +1257,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
||||||
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
||||||
|
|
||||||
int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
|
SResultRow* prevRow = pResultRowInfo->current;
|
||||||
|
// int32_t prevIndex = curTimeWindowIndex(pResultRowInfo);
|
||||||
|
|
||||||
TSKEY* tsCols = NULL;
|
TSKEY* tsCols = NULL;
|
||||||
if (pSDataBlock->pDataBlock != NULL) {
|
if (pSDataBlock->pDataBlock != NULL) {
|
||||||
|
@ -1259,13 +1287,19 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
||||||
|
|
||||||
// prev time window not interpolation yet.
|
// prev time window not interpolation yet.
|
||||||
int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
|
// int32_t curIndex = curTimeWindowIndex(pResultRowInfo);
|
||||||
if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
|
// if (prevIndex != -1 && prevIndex < curIndex && pQueryAttr->timeWindowInterpo) {
|
||||||
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
// for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
||||||
|
if (prevRow != NULL && prevRow != pResultRowInfo->current && pQueryAttr->timeWindowInterpo) {
|
||||||
|
int32_t j = 0;
|
||||||
|
while(pResultRowInfo->pResult[j] != prevRow) {
|
||||||
|
j++;
|
||||||
|
}
|
||||||
|
|
||||||
|
for(; pResultRowInfo->pResult[j] != pResultRowInfo->current; ++j) {
|
||||||
SResultRow* pRes = pResultRowInfo->pResult[j];
|
SResultRow* pRes = pResultRowInfo->pResult[j];
|
||||||
if (pRes->closed) {
|
if (pRes->closed) {
|
||||||
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
|
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
|
||||||
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3146,7 +3180,7 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQueryInfo *pTableQueryInfo) {
|
static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) {
|
||||||
if (pTableQueryInfo == NULL) {
|
if (pTableQueryInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -3158,7 +3192,12 @@ static void updateTableQueryInfoForReverseScan(SQueryAttr *pQueryAttr, STableQue
|
||||||
pTableQueryInfo->cur.vgroupIndex = -1;
|
pTableQueryInfo->cur.vgroupIndex = -1;
|
||||||
|
|
||||||
// set the index to be the end slot of result rows array
|
// set the index to be the end slot of result rows array
|
||||||
pTableQueryInfo->resInfo.curIndex = pTableQueryInfo->resInfo.size - 1;
|
SResultRowInfo* pResRowInfo = &pTableQueryInfo->resInfo;
|
||||||
|
if (pResRowInfo->size > 0) {
|
||||||
|
pResRowInfo->current = pResRowInfo->pResult[pResRowInfo->size - 1];
|
||||||
|
} else {
|
||||||
|
pResRowInfo->current = NULL;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
|
@ -3172,7 +3211,7 @@ static void setupQueryRangeForReverseScan(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
size_t t = taosArrayGetSize(group);
|
size_t t = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < t; ++j) {
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
STableQueryInfo *pCheckInfo = taosArrayGetP(group, j);
|
||||||
updateTableQueryInfoForReverseScan(pQueryAttr, pCheckInfo);
|
updateTableQueryInfoForReverseScan(pCheckInfo);
|
||||||
|
|
||||||
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
// update the last key in tableKeyInfo list, the tableKeyInfo is used to build the tsdbQueryHandle and decide
|
||||||
// the start check timestamp of tsdbQueryHandle
|
// the start check timestamp of tsdbQueryHandle
|
||||||
|
@ -4574,7 +4613,7 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pResultRowInfo->size > 0) {
|
if (pResultRowInfo->size > 0) {
|
||||||
pResultRowInfo->curIndex = 0;
|
pResultRowInfo->current = pResultRowInfo->pResult[0];
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
|
pResultRowInfo->prevSKey = pResultRowInfo->pResult[0]->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4600,8 +4639,8 @@ static SSDataBlock* doTableScan(void* param, bool *newgroup) {
|
||||||
pTableScanInfo->order = cond.order;
|
pTableScanInfo->order = cond.order;
|
||||||
|
|
||||||
if (pResultRowInfo->size > 0) {
|
if (pResultRowInfo->size > 0) {
|
||||||
pResultRowInfo->curIndex = pResultRowInfo->size-1;
|
pResultRowInfo->current = pResultRowInfo->pResult[pResultRowInfo->size - 1];
|
||||||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->size-1]->win.skey;
|
pResultRowInfo->prevSKey = pResultRowInfo->current->win.skey;
|
||||||
}
|
}
|
||||||
|
|
||||||
p = doTableScanImpl(pOperator, newgroup);
|
p = doTableScanImpl(pOperator, newgroup);
|
||||||
|
|
|
@ -938,7 +938,7 @@ void SqlInfoDestroy(SSqlInfo *pInfo) {
|
||||||
taosArrayDestroy(pInfo->pMiscInfo->a);
|
taosArrayDestroy(pInfo->pMiscInfo->a);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pMiscInfo != NULL && pInfo->type == TSDB_SQL_CREATE_DB) {
|
if (pInfo->pMiscInfo != NULL && (pInfo->type == TSDB_SQL_CREATE_DB || pInfo->type == TSDB_SQL_ALTER_DB)) {
|
||||||
taosArrayDestroyEx(pInfo->pMiscInfo->dbOpt.keep, freeVariant);
|
taosArrayDestroyEx(pInfo->pMiscInfo->dbOpt.keep, freeVariant);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,7 +45,7 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size, int16_t
|
||||||
pResultRowInfo->type = type;
|
pResultRowInfo->type = type;
|
||||||
pResultRowInfo->size = 0;
|
pResultRowInfo->size = 0;
|
||||||
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
||||||
pResultRowInfo->curIndex = -1;
|
pResultRowInfo->current = NULL;
|
||||||
pResultRowInfo->capacity = size;
|
pResultRowInfo->capacity = size;
|
||||||
|
|
||||||
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
|
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
|
||||||
|
@ -91,8 +91,8 @@ void resetResultRowInfo(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRo
|
||||||
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
|
taosHashRemove(pRuntimeEnv->pResultRowHashTable, (const char *)pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(sizeof(groupIndex)));
|
||||||
}
|
}
|
||||||
|
|
||||||
pResultRowInfo->curIndex = -1;
|
pResultRowInfo->size = 0;
|
||||||
pResultRowInfo->size = 0;
|
pResultRowInfo->current = NULL;
|
||||||
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
pResultRowInfo->prevSKey = TSKEY_INITIAL_VAL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,15 +126,38 @@ _hash_fn_t taosGetDefaultHashFunction(int32_t type) {
|
||||||
_hash_fn_t fn = NULL;
|
_hash_fn_t fn = NULL;
|
||||||
switch(type) {
|
switch(type) {
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
case TSDB_DATA_TYPE_BIGINT: fn = taosIntHash_64;break;
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
case TSDB_DATA_TYPE_BINARY: fn = MurmurHash3_32;break;
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
case TSDB_DATA_TYPE_NCHAR: fn = MurmurHash3_32;break;
|
fn = taosIntHash_64;
|
||||||
case TSDB_DATA_TYPE_INT: fn = taosIntHash_32; break;
|
break;
|
||||||
case TSDB_DATA_TYPE_SMALLINT: fn = taosIntHash_16; break;
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
case TSDB_DATA_TYPE_TINYINT: fn = taosIntHash_8; break;
|
fn = MurmurHash3_32;
|
||||||
case TSDB_DATA_TYPE_FLOAT: fn = taosFloatHash; break;
|
break;
|
||||||
case TSDB_DATA_TYPE_DOUBLE: fn = taosDoubleHash; break;
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
default: fn = taosIntHash_32;break;
|
fn = MurmurHash3_32;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_UINT:
|
||||||
|
case TSDB_DATA_TYPE_INT:
|
||||||
|
fn = taosIntHash_32;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
|
fn = taosIntHash_16;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
|
fn = taosIntHash_8;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
|
fn = taosFloatHash;
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
|
fn = taosDoubleHash;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
fn = taosIntHash_32;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return fn;
|
return fn;
|
||||||
|
|
Loading…
Reference in New Issue