merge from develop
This commit is contained in:
commit
ce232e1f98
10
.travis.yml
10
.travis.yml
|
@ -36,6 +36,8 @@ matrix:
|
||||||
- psmisc
|
- psmisc
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
|
- export TZ=Asia/Harbin
|
||||||
|
- date
|
||||||
- cd ${TRAVIS_BUILD_DIR}
|
- cd ${TRAVIS_BUILD_DIR}
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
@ -150,6 +152,8 @@ matrix:
|
||||||
- DESC="trusty/gcc-4.8 build"
|
- DESC="trusty/gcc-4.8 build"
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
|
- export TZ=Asia/Harbin
|
||||||
|
- date
|
||||||
- cd ${TRAVIS_BUILD_DIR}
|
- cd ${TRAVIS_BUILD_DIR}
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
@ -173,6 +177,8 @@ matrix:
|
||||||
- cmake
|
- cmake
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
|
- export TZ=Asia/Harbin
|
||||||
|
- date
|
||||||
- cd ${TRAVIS_BUILD_DIR}
|
- cd ${TRAVIS_BUILD_DIR}
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
@ -197,6 +203,8 @@ matrix:
|
||||||
- cmake
|
- cmake
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
|
- export TZ=Asia/Harbin
|
||||||
|
- date
|
||||||
- cd ${TRAVIS_BUILD_DIR}
|
- cd ${TRAVIS_BUILD_DIR}
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
@ -225,6 +233,8 @@ matrix:
|
||||||
- DESC="trusty/gcc-4.8 build"
|
- DESC="trusty/gcc-4.8 build"
|
||||||
|
|
||||||
before_script:
|
before_script:
|
||||||
|
- export TZ=Asia/Harbin
|
||||||
|
- date
|
||||||
- cd ${TRAVIS_BUILD_DIR}
|
- cd ${TRAVIS_BUILD_DIR}
|
||||||
- mkdir debug
|
- mkdir debug
|
||||||
- cd debug
|
- cd debug
|
||||||
|
|
|
@ -259,7 +259,7 @@ bool isNEleNull(SDataCol *pCol, int nEle) {
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
for (int i = 0; i < nEle; i++) {
|
for (int i = 0; i < nEle; i++) {
|
||||||
if (!isNull(varDataVal(tdGetColDataOfRow(pCol, i)), pCol->type)) return false;
|
if (!isNull(tdGetColDataOfRow(pCol, i), pCol->type)) return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -328,6 +328,7 @@ static int32_t dnodeOpenVnodes() {
|
||||||
}
|
}
|
||||||
|
|
||||||
free(vnodeList);
|
free(vnodeList);
|
||||||
|
free(threads);
|
||||||
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
|
dInfo("there are total vnodes:%d, openned:%d failed:%d", numOfVnodes, openVnodes, failedVnodes);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -790,7 +790,7 @@ int isCommentLine(char *line) {
|
||||||
void source_file(TAOS *con, char *fptr) {
|
void source_file(TAOS *con, char *fptr) {
|
||||||
wordexp_t full_path;
|
wordexp_t full_path;
|
||||||
int read_len = 0;
|
int read_len = 0;
|
||||||
char * cmd = calloc(1, MAX_COMMAND_SIZE);
|
char * cmd = calloc(1, tsMaxSQLStringLen+1);
|
||||||
size_t cmd_len = 0;
|
size_t cmd_len = 0;
|
||||||
char * line = NULL;
|
char * line = NULL;
|
||||||
size_t line_len = 0;
|
size_t line_len = 0;
|
||||||
|
@ -822,7 +822,7 @@ void source_file(TAOS *con, char *fptr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
while ((read_len = getline(&line, &line_len, f)) != -1) {
|
while ((read_len = getline(&line, &line_len, f)) != -1) {
|
||||||
if (read_len >= MAX_COMMAND_SIZE) continue;
|
if (read_len >= tsMaxSQLStringLen) continue;
|
||||||
line[--read_len] = '\0';
|
line[--read_len] = '\0';
|
||||||
|
|
||||||
if (read_len == 0 || isCommentLine(line)) { // line starts with #
|
if (read_len == 0 || isCommentLine(line)) { // line starts with #
|
||||||
|
@ -839,7 +839,7 @@ void source_file(TAOS *con, char *fptr) {
|
||||||
memcpy(cmd + cmd_len, line, read_len);
|
memcpy(cmd + cmd_len, line, read_len);
|
||||||
printf("%s%s\n", PROMPT_HEADER, cmd);
|
printf("%s%s\n", PROMPT_HEADER, cmd);
|
||||||
shellRunCommand(con, cmd);
|
shellRunCommand(con, cmd);
|
||||||
memset(cmd, 0, MAX_COMMAND_SIZE);
|
memset(cmd, 0, tsMaxSQLStringLen);
|
||||||
cmd_len = 0;
|
cmd_len = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -12,12 +12,10 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include <taosmsg.h>
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "qfill.h"
|
#include "qfill.h"
|
||||||
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "hashfunc.h"
|
|
||||||
#include "qExecutor.h"
|
#include "qExecutor.h"
|
||||||
#include "qUtil.h"
|
#include "qUtil.h"
|
||||||
#include "qast.h"
|
#include "qast.h"
|
||||||
|
@ -25,7 +23,6 @@
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "queryLog.h"
|
#include "queryLog.h"
|
||||||
#include "taosmsg.h"
|
#include "taosmsg.h"
|
||||||
#include "tdataformat.h"
|
|
||||||
#include "tlosertree.h"
|
#include "tlosertree.h"
|
||||||
#include "tscUtil.h" // todo move the function to common module
|
#include "tscUtil.h" // todo move the function to common module
|
||||||
#include "tscompression.h"
|
#include "tscompression.h"
|
||||||
|
@ -91,6 +88,9 @@ typedef struct {
|
||||||
} SQueryStatusInfo;
|
} SQueryStatusInfo;
|
||||||
|
|
||||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
|
#define GET_NUM_OF_TABLEGROUP(q) taosArrayGetSize((q)->tableqinfoGroupInfo.pGroupList)
|
||||||
|
#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index)))
|
||||||
|
|
||||||
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
static void setQueryStatus(SQuery *pQuery, int8_t status);
|
||||||
|
|
||||||
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
static bool isIntervalQuery(SQuery *pQuery) { return pQuery->intervalTime > 0; }
|
||||||
|
@ -1708,7 +1708,23 @@ static bool onlyFirstQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSD
|
||||||
|
|
||||||
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
|
static bool onlyLastQuery(SQuery *pQuery) { return onlyOneQueryType(pQuery, TSDB_FUNC_LAST, TSDB_FUNC_LAST_DST); }
|
||||||
|
|
||||||
static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
|
// todo refactor, add iterator
|
||||||
|
static void doExchangeTimeWindow(SQInfo* pQInfo) {
|
||||||
|
size_t t = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
for(int32_t i = 0; i < t; ++i) {
|
||||||
|
SArray* p1 = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
|
size_t len = taosArrayGetSize(p1);
|
||||||
|
for(int32_t j = 0; j < len; ++j) {
|
||||||
|
STableQueryInfo* pTableQueryInfo = (STableQueryInfo*) taosArrayGetP(p1, j);
|
||||||
|
SWAP(pTableQueryInfo->win.skey, pTableQueryInfo->win.ekey, TSKEY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void changeExecuteScanOrder(SQInfo *pQInfo, bool stableQuery) {
|
||||||
|
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
// in case of point-interpolation query, use asc order scan
|
// in case of point-interpolation query, use asc order scan
|
||||||
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64
|
char msg[] = "QInfo:%p scan order changed for %s query, old:%d, new:%d, qrange exchanged, old qrange:%" PRId64
|
||||||
"-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
|
"-%" PRId64 ", new qrange:%" PRId64 "-%" PRId64;
|
||||||
|
@ -1748,6 +1764,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
|
||||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||||
|
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
|
doExchangeTimeWindow(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->order.order = TSDB_ORDER_ASC;
|
pQuery->order.order = TSDB_ORDER_ASC;
|
||||||
|
@ -1757,6 +1774,7 @@ static void changeExecuteScanOrder(SQuery *pQuery, bool stableQuery) {
|
||||||
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
pQuery->window.ekey, pQuery->window.ekey, pQuery->window.skey);
|
||||||
|
|
||||||
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
SWAP(pQuery->window.skey, pQuery->window.ekey, TSKEY);
|
||||||
|
doExchangeTimeWindow(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->order.order = TSDB_ORDER_DESC;
|
pQuery->order.order = TSDB_ORDER_DESC;
|
||||||
|
@ -2219,7 +2237,7 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
if (isNull(varDataVal(val), type)) {
|
if (isNull(val, type)) {
|
||||||
tag->nType = TSDB_DATA_TYPE_NULL;
|
tag->nType = TSDB_DATA_TYPE_NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2489,10 +2507,10 @@ int32_t mergeIntoGroupResult(SQInfo *pQInfo) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
while (pQInfo->groupIndex < numOfGroups) {
|
while (pQInfo->groupIndex < numOfGroups) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, pQInfo->groupIndex);
|
SArray *group = GET_TABLEGROUP(pQInfo, pQInfo->groupIndex);
|
||||||
ret = mergeIntoGroupResultImpl(pQInfo, group);
|
ret = mergeIntoGroupResultImpl(pQInfo, group);
|
||||||
if (ret < 0) { // not enough disk space to save the data into disk
|
if (ret < 0) { // not enough disk space to save the data into disk
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -2525,7 +2543,7 @@ void copyResToQueryResultBuf(SQInfo *pQInfo, SQuery *pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if all results has been sent to client
|
// check if all results has been sent to client
|
||||||
int32_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
|
if (pQInfo->numOfGroupResultPages == 0 && pQInfo->groupIndex == numOfGroup) {
|
||||||
pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed
|
pQInfo->tableIndex = pQInfo->tableqinfoGroupInfo.numOfTables; // set query completed
|
||||||
return;
|
return;
|
||||||
|
@ -2859,10 +2877,10 @@ void disableFuncInReverseScan(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfGroups; ++i) {
|
for(int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t t = taosArrayGetSize(group);
|
size_t t = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < t; ++j) {
|
for (int32_t j = 0; j < t; ++j) {
|
||||||
|
@ -3349,7 +3367,7 @@ void setWindowResOutputBufInitCtx(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *
|
||||||
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
|
||||||
|
|
||||||
pCtx->resultInfo = &pResult->resultInfo[i];
|
pCtx->resultInfo = &pResult->resultInfo[i];
|
||||||
if (pCtx->resultInfo->complete) {
|
if (pCtx->resultInfo->initialized && pCtx->resultInfo->complete) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3479,7 +3497,7 @@ static int32_t getNumOfSubset(SQInfo *pQInfo) {
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (isIntervalQuery(pQuery))) {
|
||||||
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
totalSubset = numOfClosedTimeWindow(&pQInfo->runtimeEnv.windowResInfo);
|
||||||
} else {
|
} else {
|
||||||
totalSubset = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
totalSubset = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return totalSubset;
|
return totalSubset;
|
||||||
|
@ -3619,36 +3637,40 @@ void stableApplyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *
|
||||||
bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
bool queryHasRemainResults(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
|
SFillInfo *pFillInfo = pRuntimeEnv->pFillInfo;
|
||||||
|
|
||||||
// todo refactor
|
if (pQuery->limit.limit > 0 && pQuery->rec.total >= pQuery->limit.limit) {
|
||||||
if (pQuery->fillType == TSDB_FILL_NONE || (pQuery->fillType != TSDB_FILL_NONE && isPointInterpoQuery(pQuery))) {
|
|
||||||
assert(pFillInfo == NULL);
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQuery->limit.limit > 0 && pQuery->rec.rows >= pQuery->limit.limit) {
|
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
|
||||||
return false;
|
// There are results not returned to client yet, so filling operation applied to the remain result is required
|
||||||
}
|
// in the first place.
|
||||||
|
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
||||||
|
if (remain > 0) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
// There are results not returned to client, fill operation applied to the remain result set in the
|
/*
|
||||||
// first place is required.
|
* While the code reaches here, there are no results remains now.
|
||||||
int32_t remain = taosNumOfRemainRows(pFillInfo);
|
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block
|
||||||
if (remain > 0) {
|
* is retrieved from TSDB.
|
||||||
return true;
|
*
|
||||||
}
|
* NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result
|
||||||
|
* set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
|
||||||
/*
|
* first result row in the actual result set will fill nothing.
|
||||||
* While the code reaches here, there are no results returned to client now.
|
*/
|
||||||
* If query is not completed yet, the gaps between two results blocks need to be handled after next data block
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
* is retrieved from TSDB.
|
int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity);
|
||||||
*
|
return numOfTotal > 0;
|
||||||
* NOTE: If the result set is not the first block, the gap in front of the result set will be filled. If the result
|
}
|
||||||
* set is the FIRST result block, the gap between the start time of query time window and the timestamp of the
|
|
||||||
* first result row in the actual result set will fill nothing.
|
} else {
|
||||||
*/
|
// there are results waiting for returned to client.
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED) &&
|
||||||
int32_t numOfTotal = getFilledNumOfRes(pFillInfo, pQuery->window.ekey, pQuery->rec.capacity);
|
(isGroupbyNormalCol(pQuery->pGroupbyExpr) || isIntervalQuery(pQuery)) &&
|
||||||
return numOfTotal > 0;
|
(pRuntimeEnv->windowResInfo.size > 0)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -3690,7 +3712,7 @@ static void doCopyQueryResultToMsg(SQInfo *pQInfo, int32_t numOfRows, char *data
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfInterpo) {
|
int32_t doFillGapsInResults(SQueryRuntimeEnv* pRuntimeEnv, tFilePage **pDst, int32_t *numOfFilled) {
|
||||||
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
SFillInfo* pFillInfo = pRuntimeEnv->pFillInfo;
|
||||||
|
@ -3995,7 +4017,7 @@ static void setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery) {
|
||||||
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
&& (!isGroupbyNormalCol(pQuery->pGroupbyExpr))
|
||||||
&& (!isFixedOutputQuery(pQuery))
|
&& (!isFixedOutputQuery(pQuery))
|
||||||
) {
|
) {
|
||||||
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
|
STableQueryInfo* pCheckInfo = taosArrayGetP(pa, 0);
|
||||||
cond.twindow = pCheckInfo->win;
|
cond.twindow = pCheckInfo->win;
|
||||||
}
|
}
|
||||||
|
@ -4039,7 +4061,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, int32_t vgId, bool
|
||||||
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
changeExecuteScanOrder(pQuery, false);
|
changeExecuteScanOrder(pQInfo, false);
|
||||||
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
|
@ -4142,9 +4164,9 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
STableQueryInfo *pTableQueryInfo = NULL;
|
STableQueryInfo *pTableQueryInfo = NULL;
|
||||||
|
|
||||||
// todo opt performance using hash table
|
// todo opt performance using hash table
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(group);
|
size_t num = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -4197,7 +4219,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
setQueryStatus(pQuery, QUERY_NOT_COMPLETED);
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray *group = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
|
STableQueryInfo* pCheckInfo = taosArrayGetP(group, index);
|
||||||
|
|
||||||
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
|
setTagVal(pRuntimeEnv, pCheckInfo->pTable, pQInfo->tsdb);
|
||||||
|
@ -4261,7 +4283,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
|
||||||
size_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
|
|
||||||
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
|
if (isPointInterpoQuery(pQuery) || isFirstLastRowQuery(pQuery)) {
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
|
@ -4311,7 +4333,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
taosArrayDestroy(s);
|
taosArrayDestroy(s);
|
||||||
|
|
||||||
// here we simply set the first table as current table
|
// here we simply set the first table as current table
|
||||||
pQuery->current = (STableQueryInfo*) taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
pQuery->current = (STableQueryInfo*) GET_TABLEGROUP(pQInfo, 0);
|
||||||
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
||||||
|
@ -4424,7 +4446,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
|
resetTimeWindowInfo(pRuntimeEnv, &pRuntimeEnv->windowResInfo);
|
||||||
|
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray *group = GET_TABLEGROUP(pQInfo, 0);
|
||||||
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
|
assert(taosArrayGetSize(group) == pQInfo->tableqinfoGroupInfo.numOfTables &&
|
||||||
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
1 == taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList));
|
||||||
|
|
||||||
|
@ -4575,9 +4597,9 @@ static void doCloseAllTimeWindowAfterScan(SQInfo *pQInfo) {
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
if (isIntervalQuery(pQuery)) {
|
if (isIntervalQuery(pQuery)) {
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||||
SArray *group = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *group = GET_TABLEGROUP(pQInfo, i);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(group);
|
size_t num = taosArrayGetSize(group);
|
||||||
for (int32_t j = 0; j < num; ++j) {
|
for (int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -4794,7 +4816,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
pQuery->current = pTableInfo;
|
pQuery->current = pTableInfo;
|
||||||
|
|
||||||
int32_t numOfInterpo = 0;
|
int32_t numOfFilled = 0;
|
||||||
TSKEY newStartKey = TSKEY_INITIAL_VAL;
|
TSKEY newStartKey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
// skip blocks without load the actual data block from file if no filter condition present
|
// skip blocks without load the actual data block from file if no filter condition present
|
||||||
|
@ -4822,9 +4844,9 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
} else {
|
} else {
|
||||||
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
|
taosFillSetStartInfo(pRuntimeEnv->pFillInfo, pQuery->rec.rows, pQuery->window.ekey);
|
||||||
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
taosFillCopyInputDataFromFilePage(pRuntimeEnv->pFillInfo, (tFilePage**) pQuery->sdata);
|
||||||
numOfInterpo = 0;
|
numOfFilled = 0;
|
||||||
|
|
||||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
|
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
|
||||||
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
if (pQuery->rec.rows > 0 || Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
||||||
limitResults(pRuntimeEnv);
|
limitResults(pRuntimeEnv);
|
||||||
break;
|
break;
|
||||||
|
@ -4843,7 +4865,7 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
|
||||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQInfo->pointsInterpo += numOfInterpo;
|
pQInfo->pointsInterpo += numOfFilled;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tableQueryImpl(SQInfo *pQInfo) {
|
static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
|
@ -4851,45 +4873,41 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
if (queryHasRemainResults(pRuntimeEnv)) {
|
if (queryHasRemainResults(pRuntimeEnv)) {
|
||||||
/*
|
|
||||||
* There are remain results that are not returned due to result interpolation
|
|
||||||
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
|
|
||||||
*/
|
|
||||||
int32_t numOfInterpo = 0;
|
|
||||||
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfInterpo);
|
|
||||||
|
|
||||||
if (pQuery->rec.rows > 0) {
|
|
||||||
limitResults(pRuntimeEnv);
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// here we have scan all qualified data in both data file and cache
|
if (pQuery->fillType != TSDB_FILL_NONE) {
|
||||||
if (Q_STATUS_EQUAL(pQuery->status, QUERY_COMPLETED)) {
|
/*
|
||||||
// continue to get push data from the group result
|
* There are remain results that are not returned due to result interpolation
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) ||
|
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
|
||||||
((isIntervalQuery(pQuery) && pQuery->rec.total < pQuery->limit.limit))) {
|
*/
|
||||||
// todo limit the output for interval query?
|
int32_t numOfFilled = 0;
|
||||||
|
pQuery->rec.rows = doFillGapsInResults(pRuntimeEnv, (tFilePage **)pQuery->sdata, &numOfFilled);
|
||||||
|
|
||||||
|
if (pQuery->rec.rows > 0) {
|
||||||
|
limitResults(pRuntimeEnv);
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%p current:%" PRId64 " returned, total:%" PRId64, pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
pQuery->rec.rows = 0;
|
pQuery->rec.rows = 0;
|
||||||
pQInfo->groupIndex = 0; // always start from 0
|
pQInfo->groupIndex = 0; // always start from 0
|
||||||
|
|
||||||
if (pRuntimeEnv->windowResInfo.size > 0) {
|
if (pRuntimeEnv->windowResInfo.size > 0) {
|
||||||
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||||
pQuery->rec.rows += pQuery->rec.rows;
|
|
||||||
|
|
||||||
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
clearFirstNTimeWindow(pRuntimeEnv, pQInfo->groupIndex);
|
||||||
|
|
||||||
if (pQuery->rec.rows > 0) {
|
if (pQuery->rec.rows > 0) {
|
||||||
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
qDebug("QInfo:%p %"PRId64" rows returned from group results, total:%"PRId64"", pQInfo, pQuery->rec.rows, pQuery->rec.total);
|
||||||
|
|
||||||
|
// there are not data remains
|
||||||
|
if (pRuntimeEnv->windowResInfo.size <= 0) {
|
||||||
|
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
|
||||||
|
}
|
||||||
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QInfo:%p query over, %"PRId64" rows are returned", pQInfo, pQuery->rec.total);
|
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// number of points returned during this query
|
// number of points returned during this query
|
||||||
|
@ -4897,7 +4915,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
|
||||||
SArray* g = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* g = GET_TABLEGROUP(pQInfo, 0);
|
||||||
STableQueryInfo* item = taosArrayGetP(g, 0);
|
STableQueryInfo* item = taosArrayGetP(g, 0);
|
||||||
|
|
||||||
// group by normal column, sliding window query, interval query are handled by interval query processor
|
// group by normal column, sliding window query, interval query are handled by interval query processor
|
||||||
|
@ -5658,26 +5676,6 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SArray* pTableIdList,
|
||||||
return pQInfo;
|
return pQInfo;
|
||||||
|
|
||||||
_cleanup:
|
_cleanup:
|
||||||
//tfree(pQuery->fillVal);
|
|
||||||
|
|
||||||
//if (pQuery->sdata != NULL) {
|
|
||||||
// for (int16_t col = 0; col < pQuery->numOfOutput; ++col) {
|
|
||||||
// tfree(pQuery->sdata[col]);
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
|
|
||||||
//
|
|
||||||
//tfree(pQuery->sdata);
|
|
||||||
//tfree(pQuery->pFilterInfo);
|
|
||||||
//tfree(pQuery->colList);
|
|
||||||
|
|
||||||
//tfree(pExprs);
|
|
||||||
//tfree(pGroupbyExpr);
|
|
||||||
|
|
||||||
//taosArrayDestroy(pQInfo->arrTableIdInfo);
|
|
||||||
//tsdbDestoryTableGroup(&pQInfo->tableGroupInfo);
|
|
||||||
//
|
|
||||||
//tfree(pQInfo);
|
|
||||||
freeQInfo(pQInfo);
|
freeQInfo(pQInfo);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5712,7 +5710,6 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
||||||
UNUSED(ret);
|
UNUSED(ret);
|
||||||
}
|
}
|
||||||
|
|
||||||
// only the successful complete requries the sem_post/over = 1 operations.
|
|
||||||
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
if ((QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.skey > pQuery->window.ekey)) ||
|
||||||
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
(!QUERY_IS_ASC_QUERY(pQuery) && (pQuery->window.ekey > pQuery->window.skey))) {
|
||||||
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
qDebug("QInfo:%p no result in time range %" PRId64 "-%" PRId64 ", order %d", pQInfo, pQuery->window.skey,
|
||||||
|
@ -5722,7 +5719,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
||||||
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
|
qDebug("QInfo:%p no table qualified for tag filter, abort query", pQInfo);
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
@ -5784,9 +5781,9 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor, extract method to destroytableDataInfo
|
// todo refactor, extract method to destroytableDataInfo
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
int32_t numOfGroups = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *p = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, i);
|
SArray *p = GET_TABLEGROUP(pQInfo, i);;
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(p);
|
size_t num = taosArrayGetSize(p);
|
||||||
for(int32_t j = 0; j < num; ++j) {
|
for(int32_t j = 0; j < num; ++j) {
|
||||||
|
@ -6053,6 +6050,11 @@ void qTableQuery(qinfo_t qinfo, void (*fp)(void*), void* param) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQInfo->tableqinfoGroupInfo.numOfTables == 0) {
|
||||||
|
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
qDebug("QInfo:%p query task is launched", pQInfo);
|
qDebug("QInfo:%p query task is launched", pQInfo);
|
||||||
|
|
||||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||||
|
@ -6175,14 +6177,14 @@ static void buildTagQueryResult(SQInfo* pQInfo) {
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
size_t numOfGroup = taosArrayGetSize(pQInfo->tableqinfoGroupInfo.pGroupList);
|
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pQInfo);
|
||||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||||
|
|
||||||
if (numOfGroup == 0) {
|
if (numOfGroup == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pa = taosArrayGetP(pQInfo->tableqinfoGroupInfo.pGroupList, 0);
|
SArray* pa = GET_TABLEGROUP(pQInfo, 0);
|
||||||
|
|
||||||
size_t num = taosArrayGetSize(pa);
|
size_t num = taosArrayGetSize(pa);
|
||||||
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
|
assert(num == pQInfo->tableqinfoGroupInfo.numOfTables);
|
||||||
|
|
|
@ -133,7 +133,6 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->size = remain;
|
pWindowResInfo->size = remain;
|
||||||
printf("---------------size:%ld\n", taosHashGetSize(pWindowResInfo->hashList));
|
|
||||||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||||
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||||
|
|
|
@ -29,7 +29,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
|
|
||||||
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".h"};
|
const char *tsdbFileSuffix[] = {".head", ".data", ".last", "", ".h", ".l"};
|
||||||
|
|
||||||
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
static int tsdbInitFile(SFile *pFile, STsdbRepo *pRepo, int fid, int type);
|
||||||
static void tsdbDestroyFile(SFile *pFile);
|
static void tsdbDestroyFile(SFile *pFile);
|
||||||
|
|
|
@ -443,12 +443,14 @@ static int tsdbCommitMeta(STsdbRepo *pRepo) {
|
||||||
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
|
if (tdUpdateKVStoreRecord(pMeta->pStore, pAct->uid, (void *)(pCont->cont), pCont->len) < 0) {
|
||||||
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
tsdbError("vgId:%d failed to update meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
|
tdKVStoreEndCommit(pMeta->pStore);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} else if (pAct->act == TSDB_DROP_META) {
|
} else if (pAct->act == TSDB_DROP_META) {
|
||||||
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
|
if (tdDropKVStoreRecord(pMeta->pStore, pAct->uid) < 0) {
|
||||||
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
tsdbError("vgId:%d failed to drop meta with uid %" PRIu64 " since %s", REPO_ID(pRepo), pAct->uid,
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
|
tdKVStoreEndCommit(pMeta->pStore);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -480,13 +480,11 @@ int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) {
|
||||||
bool changed = false;
|
bool changed = false;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if ((pTable->type == TSDB_SUPER_TABLE) && (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema))) {
|
||||||
if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) {
|
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
|
||||||
if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) {
|
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tstrerror(terrno));
|
||||||
tstrerror(terrno));
|
return -1;
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
changed = true;
|
changed = true;
|
||||||
}
|
}
|
||||||
|
@ -1215,7 +1213,7 @@ static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable) {
|
||||||
while (tSkipListIterNext(pIter)) {
|
while (tSkipListIterNext(pIter)) {
|
||||||
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
||||||
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
|
ASSERT(TABLE_TYPE(tTable) == TSDB_CHILD_TABLE);
|
||||||
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, pTable);
|
pBuf = tsdbInsertTableAct(pRepo, TSDB_DROP_META, pBuf, tTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSkipListDestroyIter(pIter);
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
|
@ -121,18 +121,19 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) {
|
||||||
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
if (tsendfile(pHelper->files.nHeadF.fd, pHelper->files.headF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
||||||
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
||||||
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
|
TSDB_FILE_HEAD_SIZE, pHelper->files.headF.fname, pHelper->files.nHeadF.fname, strerror(errno));
|
||||||
errno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create and open .l file if should
|
// Create and open .l file if should
|
||||||
if (tsdbShouldCreateNewLast(pHelper)) {
|
if (tsdbShouldCreateNewLast(pHelper)) {
|
||||||
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
|
if (tsdbOpenFile(&(pHelper->files.nLastF), O_WRONLY | O_CREAT) < 0) goto _err;
|
||||||
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE)
|
if (tsendfile(pHelper->files.nLastF.fd, pHelper->files.lastF.fd, NULL, TSDB_FILE_HEAD_SIZE) < TSDB_FILE_HEAD_SIZE) {
|
||||||
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
tsdbError("vgId:%d failed to sendfile %d bytes from file %s to %s since %s", REPO_ID(pHelper->pRepo),
|
||||||
TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
|
TSDB_FILE_HEAD_SIZE, pHelper->files.lastF.fname, pHelper->files.nLastF.fname, strerror(errno));
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
|
if (tsdbOpenFile(&(pHelper->files.dataF), O_RDONLY) < 0) goto _err;
|
||||||
|
|
|
@ -148,7 +148,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
|
||||||
pQueryHandle->cur.fid = -1;
|
pQueryHandle->cur.fid = -1;
|
||||||
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
|
pQueryHandle->cur.win = TSWINDOW_INITIALIZER;
|
||||||
pQueryHandle->checkFiles = true;//ASCENDING_TRAVERSE(pQueryHandle->order);
|
pQueryHandle->checkFiles = true;
|
||||||
pQueryHandle->activeIndex = 0; // current active table index
|
pQueryHandle->activeIndex = 0; // current active table index
|
||||||
pQueryHandle->qinfo = qinfo;
|
pQueryHandle->qinfo = qinfo;
|
||||||
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
pQueryHandle->outputCapacity = ((STsdbRepo*)tsdb)->config.maxRowsPerFileBlock;
|
||||||
|
@ -475,11 +475,15 @@ static int32_t binarySearchForBlock(SCompBlock* pBlock, int32_t numOfBlocks, TSK
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
|
static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlocks, int32_t type) {
|
||||||
// todo check open file failed
|
|
||||||
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
|
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
|
||||||
|
|
||||||
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
|
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
|
||||||
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
|
|
||||||
|
int32_t code = tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
|
||||||
|
|
||||||
|
//open file failed, return error code to client
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// load all the comp offset value for all tables in this file
|
// load all the comp offset value for all tables in this file
|
||||||
*numOfBlocks = 0;
|
*numOfBlocks = 0;
|
||||||
|
@ -593,8 +597,7 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
|
||||||
if (pCheckInfo->pDataCols == NULL) {
|
if (pCheckInfo->pDataCols == NULL) {
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
STsdbMeta* pMeta = tsdbGetMeta(pRepo);
|
||||||
// TODO
|
// TODO
|
||||||
pCheckInfo->pDataCols =
|
pCheckInfo->pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
||||||
tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pRepo->config.maxRowsPerFileBlock);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
|
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(pCheckInfo->pTableObj));
|
||||||
|
@ -1359,16 +1362,18 @@ static int32_t createDataBlocksInfo(STsdbQueryHandle* pQueryHandle, int32_t numO
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo opt for only one table case
|
// todo opt for only one table case
|
||||||
static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
|
static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* exists) {
|
||||||
pQueryHandle->numOfBlocks = 0;
|
pQueryHandle->numOfBlocks = 0;
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
||||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
||||||
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
|
int32_t type = ASCENDING_TRAVERSE(pQueryHandle->order)? QUERY_RANGE_GREATER_EQUAL:QUERY_RANGE_LESS_EQUAL;
|
||||||
if (getFileCompInfo(pQueryHandle, &numOfBlocks, type) != TSDB_CODE_SUCCESS) {
|
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks, type)) != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1393,20 +1398,25 @@ static bool getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle) {
|
||||||
|
|
||||||
// no data in file anymore
|
// no data in file anymore
|
||||||
if (pQueryHandle->numOfBlocks <= 0) {
|
if (pQueryHandle->numOfBlocks <= 0) {
|
||||||
assert(pQueryHandle->pFileGroup == NULL);
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
assert(pQueryHandle->pFileGroup == NULL);
|
||||||
|
}
|
||||||
|
|
||||||
cur->fid = -1; // denote that there are no data in file anymore
|
cur->fid = -1; // denote that there are no data in file anymore
|
||||||
|
*exists = false;
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
cur->slot = ASCENDING_TRAVERSE(pQueryHandle->order)? 0:pQueryHandle->numOfBlocks-1;
|
||||||
cur->fid = pQueryHandle->pFileGroup->fileId;
|
cur->fid = pQueryHandle->pFileGroup->fileId;
|
||||||
|
|
||||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||||
return loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
|
*exists = loadFileDataBlock(pQueryHandle, pBlockInfo->compBlock, pBlockInfo->pTableCheckInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists) {
|
||||||
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
STsdbFileH* pFileHandle = tsdbGetFile(pQueryHandle->pTsdb);
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
|
|
||||||
|
@ -1419,7 +1429,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
tsdbInitFileGroupIter(pFileHandle, &pQueryHandle->fileIter, pQueryHandle->order);
|
||||||
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
tsdbSeekFileGroupIter(&pQueryHandle->fileIter, fid);
|
||||||
|
|
||||||
return getDataBlocksInFilesImpl(pQueryHandle);
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
||||||
} else {
|
} else {
|
||||||
// check if current file block is all consumed
|
// check if current file block is all consumed
|
||||||
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pBlockInfo = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||||
|
@ -1430,7 +1440,7 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
if ((cur->slot == pQueryHandle->numOfBlocks - 1 && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
(cur->slot == 0 && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
// all data blocks in current file has been checked already, try next file if exists
|
// all data blocks in current file has been checked already, try next file if exists
|
||||||
return getDataBlocksInFilesImpl(pQueryHandle);
|
return getDataBlocksInFilesImpl(pQueryHandle, exists);
|
||||||
} else {
|
} else {
|
||||||
// next block of the same file
|
// next block of the same file
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order) ? 1 : -1;
|
||||||
|
@ -1440,11 +1450,15 @@ static bool getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle) {
|
||||||
cur->blockCompleted = false;
|
cur->blockCompleted = false;
|
||||||
|
|
||||||
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
|
STableBlockInfo* pNext = &pQueryHandle->pDataBlockInfo[cur->slot];
|
||||||
return loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
|
*exists = loadFileDataBlock(pQueryHandle, pNext->compBlock, pNext->pTableCheckInfo);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
handleDataMergeIfNeeded(pQueryHandle, pBlockInfo->compBlock, pCheckInfo);
|
||||||
return pQueryHandle->realNumOfRows > 0;
|
*exists = pQueryHandle->realNumOfRows > 0;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1576,8 +1590,14 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQueryHandle->checkFiles) {
|
if (pQueryHandle->checkFiles) {
|
||||||
if (getDataBlocksInFiles(pQueryHandle)) {
|
bool exists = true;
|
||||||
return true;
|
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (exists) {
|
||||||
|
return exists;
|
||||||
}
|
}
|
||||||
|
|
||||||
pQueryHandle->activeIndex = 0;
|
pQueryHandle->activeIndex = 0;
|
||||||
|
|
|
@ -259,6 +259,7 @@ int tdUpdateKVStoreRecord(SKVStore *pStore, uint64_t uid, void *cont, int contLe
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
|
taosHashPut(pStore->map, (void *)(&uid), sizeof(uid), (void *)(&rInfo), sizeof(rInfo));
|
||||||
|
uDebug("put uid %" PRIu64 " into kvStore %s", uid, pStore->fname);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -292,6 +293,7 @@ int tdDropKVStoreRecord(SKVStore *pStore, uint64_t uid) {
|
||||||
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
|
pStore->info.tombSize += (rInfo.size + sizeof(SKVRecord) * 2);
|
||||||
|
|
||||||
taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid));
|
taosHashRemove(pStore->map, (void *)(&uid), sizeof(uid));
|
||||||
|
uDebug("drop uid %" PRIu64 " from KV store %s", uid, pStore->fname);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,9 +123,8 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
||||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
code = tsdbCreateRepo(tsdbDir, &tsdbCfg);
|
if (tsdbCreateRepo(tsdbDir, &tsdbCfg) < 0) {
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
|
||||||
vError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
|
|
||||||
return TSDB_CODE_VND_INIT_FAILED;
|
return TSDB_CODE_VND_INIT_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@
|
||||||
4. pip install ../src/connector/python/linux/python2 ; pip3 install
|
4. pip install ../src/connector/python/linux/python2 ; pip3 install
|
||||||
../src/connector/python/linux/python3
|
../src/connector/python/linux/python3
|
||||||
|
|
||||||
5. pip install numpy; pip3 install numpy
|
5. pip install numpy; pip3 install numpy (numpy is required only if you need to run querySort.py)
|
||||||
|
|
||||||
> Note: Both Python2 and Python3 are currently supported by the Python test
|
> Note: Both Python2 and Python3 are currently supported by the Python test
|
||||||
> framework. Since Python2 is no longer officially supported by Python Software
|
> framework. Since Python2 is no longer officially supported by Python Software
|
||||||
|
|
|
@ -77,11 +77,7 @@ class TDTestCase:
|
||||||
# join queries
|
# join queries
|
||||||
tdSql.query(
|
tdSql.query(
|
||||||
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
|
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
|
||||||
tdSql.checkRows(6)
|
tdSql.checkRows(6)
|
||||||
|
|
||||||
tdSql.query(
|
|
||||||
"select * from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id order by ts desc")
|
|
||||||
tdSql.checkColumnSorted(0, "desc")
|
|
||||||
|
|
||||||
tdSql.error(
|
tdSql.error(
|
||||||
"select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
|
"select ts, pressure, temperature, id, dscrption from stb_p, stb_t where stb_p.ts=stb_t.ts and stb_p.id = stb_t.id")
|
||||||
|
|
|
@ -16,6 +16,7 @@ import taos
|
||||||
from util.log import *
|
from util.log import *
|
||||||
from util.cases import *
|
from util.cases import *
|
||||||
from util.sql import *
|
from util.sql import *
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
|
||||||
class TDTestCase:
|
class TDTestCase:
|
||||||
|
@ -26,6 +27,46 @@ class TDTestCase:
|
||||||
self.rowNum = 10
|
self.rowNum = 10
|
||||||
self.ts = 1537146000000
|
self.ts = 1537146000000
|
||||||
|
|
||||||
|
def checkColumnSorted(self, col, order):
|
||||||
|
frame = inspect.stack()[1]
|
||||||
|
callerModule = inspect.getmodule(frame[0])
|
||||||
|
callerFilename = callerModule.__file__
|
||||||
|
|
||||||
|
if col < 0:
|
||||||
|
tdLog.exit(
|
||||||
|
"%s failed: sql:%s, col:%d is smaller than zero" %
|
||||||
|
(callerFilename, tdSql.sql, col))
|
||||||
|
if col > tdSql.queryCols:
|
||||||
|
tdLog.exit(
|
||||||
|
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
|
||||||
|
(callerFilename, tdSql.sql, col, tdSql.queryCols))
|
||||||
|
|
||||||
|
matrix = np.array(tdSql.queryResult)
|
||||||
|
list = matrix[:, 0]
|
||||||
|
|
||||||
|
if order == "" or order.upper() == "ASC":
|
||||||
|
if all(sorted(list) == list):
|
||||||
|
tdLog.info(
|
||||||
|
"sql:%s, column :%d is sorted in accending order as expected" %
|
||||||
|
(tdSql.sql, col))
|
||||||
|
else:
|
||||||
|
tdLog.exit(
|
||||||
|
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
|
||||||
|
(callerFilename, tdSql.sql, col))
|
||||||
|
elif order.upper() == "DESC":
|
||||||
|
if all(sorted(list, reverse=True) == list):
|
||||||
|
tdLog.info(
|
||||||
|
"sql:%s, column :%d is sorted in decending order as expected" %
|
||||||
|
(tdSql.sql, col))
|
||||||
|
else:
|
||||||
|
tdLog.exit(
|
||||||
|
"%s failed: sql:%s, col:%d is not sorted in decending order" %
|
||||||
|
(callerFilename, tdSql.sql, col))
|
||||||
|
else:
|
||||||
|
tdLog.exit(
|
||||||
|
"%s failed: sql:%s, the order provided for col:%d is not correct" %
|
||||||
|
(callerFilename, tdSql.sql, col))
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
tdSql.prepare()
|
tdSql.prepare()
|
||||||
|
|
||||||
|
@ -49,11 +90,11 @@ class TDTestCase:
|
||||||
print("======= step 2: verify order for each column =========")
|
print("======= step 2: verify order for each column =========")
|
||||||
# sort for timestamp in asc order
|
# sort for timestamp in asc order
|
||||||
tdSql.query("select * from st order by ts asc")
|
tdSql.query("select * from st order by ts asc")
|
||||||
tdSql.checkColumnSorted(0, "asc")
|
self.checkColumnSorted(0, "asc")
|
||||||
|
|
||||||
# sort for timestamp in desc order
|
# sort for timestamp in desc order
|
||||||
tdSql.query("select * from st order by ts desc")
|
tdSql.query("select * from st order by ts desc")
|
||||||
tdSql.checkColumnSorted(0, "desc")
|
self.checkColumnSorted(0, "desc")
|
||||||
|
|
||||||
for i in range(1, 10):
|
for i in range(1, 10):
|
||||||
tdSql.error("select * from st order by tbcol%d" % i)
|
tdSql.error("select * from st order by tbcol%d" % i)
|
||||||
|
@ -63,17 +104,17 @@ class TDTestCase:
|
||||||
tdSql.query(
|
tdSql.query(
|
||||||
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d" %
|
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d" %
|
||||||
(i, i))
|
(i, i))
|
||||||
tdSql.checkColumnSorted(1, "")
|
self.checkColumnSorted(1, "")
|
||||||
|
|
||||||
tdSql.query(
|
tdSql.query(
|
||||||
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" %
|
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d asc" %
|
||||||
(i, i))
|
(i, i))
|
||||||
tdSql.checkColumnSorted(1, "asc")
|
self.checkColumnSorted(1, "asc")
|
||||||
|
|
||||||
tdSql.query(
|
tdSql.query(
|
||||||
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" %
|
"select avg(tbcol1) from st group by tagcol%d order by tagcol%d desc" %
|
||||||
(i, i))
|
(i, i))
|
||||||
tdSql.checkColumnSorted(1, "desc")
|
self.checkColumnSorted(1, "desc")
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
|
@ -43,7 +43,7 @@ python3 ./test.py -f tag_lite/commit.py
|
||||||
python3 ./test.py -f tag_lite/create.py
|
python3 ./test.py -f tag_lite/create.py
|
||||||
python3 ./test.py -f tag_lite/datatype.py
|
python3 ./test.py -f tag_lite/datatype.py
|
||||||
python3 ./test.py -f tag_lite/datatype-without-alter.py
|
python3 ./test.py -f tag_lite/datatype-without-alter.py
|
||||||
# python3 ./test.py -f tag_lite/delete.py
|
python3 ./test.py -f tag_lite/delete.py
|
||||||
python3 ./test.py -f tag_lite/double.py
|
python3 ./test.py -f tag_lite/double.py
|
||||||
python3 ./test.py -f tag_lite/float.py
|
python3 ./test.py -f tag_lite/float.py
|
||||||
python3 ./test.py -f tag_lite/int_binary.py
|
python3 ./test.py -f tag_lite/int_binary.py
|
||||||
|
@ -134,9 +134,10 @@ python3 ./test.py -f table/del_stable.py
|
||||||
python3 ./test.py -f query/filter.py
|
python3 ./test.py -f query/filter.py
|
||||||
python3 ./test.py -f query/filterAllIntTypes.py
|
python3 ./test.py -f query/filterAllIntTypes.py
|
||||||
python3 ./test.py -f query/filterFloatAndDouble.py
|
python3 ./test.py -f query/filterFloatAndDouble.py
|
||||||
|
python3 ./test.py -f query/filterOtherTypes.py
|
||||||
|
python3 ./test.py -f query/queryError.py
|
||||||
python3 ./test.py -f query/querySort.py
|
python3 ./test.py -f query/querySort.py
|
||||||
|
|
||||||
|
|
||||||
#stream
|
#stream
|
||||||
python3 ./test.py -f stream/stream1.py
|
python3 ./test.py -f stream/stream1.py
|
||||||
python3 ./test.py -f stream/stream2.py
|
python3 ./test.py -f stream/stream2.py
|
||||||
|
|
|
@ -8,24 +8,8 @@ python3 ./test.py $1 -s && sleep 1
|
||||||
# insert
|
# insert
|
||||||
python3 ./test.py $1 -f insert/basic.py
|
python3 ./test.py $1 -f insert/basic.py
|
||||||
python3 ./test.py $1 -s && sleep 1
|
python3 ./test.py $1 -s && sleep 1
|
||||||
python3 ./test.py $1 -f insert/int.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/float.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/bigint.py
|
python3 ./test.py $1 -f insert/bigint.py
|
||||||
python3 ./test.py $1 -s && sleep 1
|
python3 ./test.py $1 -s && sleep 1
|
||||||
python3 ./test.py $1 -f insert/bool.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/double.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/smallint.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/tinyint.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/binary.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/date.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f insert/nchar.py
|
python3 ./test.py $1 -f insert/nchar.py
|
||||||
python3 ./test.py $1 -s && sleep 1
|
python3 ./test.py $1 -s && sleep 1
|
||||||
python3 ./test.py $1 -f insert/multi.py
|
python3 ./test.py $1 -f insert/multi.py
|
||||||
|
@ -42,18 +26,6 @@ python3 ./test.py $1 -s && sleep 1
|
||||||
# import
|
# import
|
||||||
python3 ./test.py $1 -f import_merge/importDataLastSub.py
|
python3 ./test.py $1 -f import_merge/importDataLastSub.py
|
||||||
python3 ./test.py $1 -s && sleep 1
|
python3 ./test.py $1 -s && sleep 1
|
||||||
python3 ./test.py $1 -f import_merge/importHead.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f import_merge/importLastT.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f import_merge/importSpan.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f import_merge/importTail.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f import_merge/importTRestart.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
python3 ./test.py $1 -f import_merge/importInsertThenImport.py
|
|
||||||
python3 ./test.py $1 -s && sleep 1
|
|
||||||
|
|
||||||
#tag
|
#tag
|
||||||
python3 ./test.py $1 -f tag_lite/filter.py
|
python3 ./test.py $1 -f tag_lite/filter.py
|
||||||
|
|
|
@ -17,7 +17,6 @@ import time
|
||||||
import datetime
|
import datetime
|
||||||
import inspect
|
import inspect
|
||||||
from util.log import *
|
from util.log import *
|
||||||
import numpy as np
|
|
||||||
|
|
||||||
|
|
||||||
class TDSql:
|
class TDSql:
|
||||||
|
@ -199,47 +198,7 @@ class TDSql:
|
||||||
"%s failed: sql:%s, affectedRows:%d != expect:%d" %
|
"%s failed: sql:%s, affectedRows:%d != expect:%d" %
|
||||||
(callerFilename, self.sql, self.affectedRows, expectAffectedRows))
|
(callerFilename, self.sql, self.affectedRows, expectAffectedRows))
|
||||||
tdLog.info("sql:%s, affectedRows:%d == expect:%d" %
|
tdLog.info("sql:%s, affectedRows:%d == expect:%d" %
|
||||||
(self.sql, self.affectedRows, expectAffectedRows))
|
(self.sql, self.affectedRows, expectAffectedRows))
|
||||||
|
|
||||||
def checkColumnSorted(self, col, order):
|
|
||||||
frame = inspect.stack()[1]
|
|
||||||
callerModule = inspect.getmodule(frame[0])
|
|
||||||
callerFilename = callerModule.__file__
|
|
||||||
|
|
||||||
if col < 0:
|
|
||||||
tdLog.exit(
|
|
||||||
"%s failed: sql:%s, col:%d is smaller than zero" %
|
|
||||||
(callerFilename, self.sql, col))
|
|
||||||
if col > self.queryCols:
|
|
||||||
tdLog.exit(
|
|
||||||
"%s failed: sql:%s, col:%d is larger than queryCols:%d" %
|
|
||||||
(callerFilename, self.sql, col, self.queryCols))
|
|
||||||
|
|
||||||
matrix = np.array(self.queryResult)
|
|
||||||
list = matrix[:, 0]
|
|
||||||
|
|
||||||
if order == "" or order.upper() == "ASC":
|
|
||||||
if all(sorted(list) == list):
|
|
||||||
tdLog.info(
|
|
||||||
"sql:%s, column :%d is sorted in accending order as expected" %
|
|
||||||
(self.sql, col))
|
|
||||||
else:
|
|
||||||
tdLog.exit(
|
|
||||||
"%s failed: sql:%s, col:%d is not sorted in accesnind order" %
|
|
||||||
(callerFilename, self.sql, col))
|
|
||||||
elif order.upper() == "DESC":
|
|
||||||
if all(sorted(list, reverse=True) == list):
|
|
||||||
tdLog.info(
|
|
||||||
"sql:%s, column :%d is sorted in decending order as expected" %
|
|
||||||
(self.sql, col))
|
|
||||||
else:
|
|
||||||
tdLog.exit(
|
|
||||||
"%s failed: sql:%s, col:%d is not sorted in decending order" %
|
|
||||||
(callerFilename, self.sql, col))
|
|
||||||
else:
|
|
||||||
tdLog.exit(
|
|
||||||
"%s failed: sql:%s, the order provided for col:%d is not correct" %
|
|
||||||
(callerFilename, self.sql, col))
|
|
||||||
|
|
||||||
|
|
||||||
tdSql = TDSql()
|
tdSql = TDSql()
|
||||||
|
|
|
@ -1,10 +1,10 @@
|
||||||
##unsupport run general/alter/cached_schema_after_alter.sim
|
##unsupport run general/alter/cached_schema_after_alter.sim
|
||||||
unsupport run general/alter/count.sim
|
run general/alter/count.sim
|
||||||
unsupport run general/alter/import.sim
|
run general/alter/import.sim
|
||||||
##unsupport run general/alter/insert1.sim
|
##unsupport run general/alter/insert1.sim
|
||||||
unsupport run general/alter/insert2.sim
|
run general/alter/insert2.sim
|
||||||
unsupport run general/alter/metrics.sim
|
run general/alter/metrics.sim
|
||||||
unsupport run general/alter/table.sim
|
run general/alter/table.sim
|
||||||
run general/cache/new_metrics.sim
|
run general/cache/new_metrics.sim
|
||||||
run general/cache/restart_metrics.sim
|
run general/cache/restart_metrics.sim
|
||||||
run general/cache/restart_table.sim
|
run general/cache/restart_table.sim
|
||||||
|
@ -86,14 +86,14 @@ run general/insert/query_block2_file.sim
|
||||||
run general/insert/query_file_memory.sim
|
run general/insert/query_file_memory.sim
|
||||||
run general/insert/query_multi_file.sim
|
run general/insert/query_multi_file.sim
|
||||||
run general/insert/tcp.sim
|
run general/insert/tcp.sim
|
||||||
##unsupport run general/parser/alter.sim
|
run general/parser/alter.sim
|
||||||
run general/parser/alter1.sim
|
run general/parser/alter1.sim
|
||||||
run general/parser/alter_stable.sim
|
run general/parser/alter_stable.sim
|
||||||
run general/parser/auto_create_tb.sim
|
run general/parser/auto_create_tb.sim
|
||||||
run general/parser/auto_create_tb_drop_tb.sim
|
run general/parser/auto_create_tb_drop_tb.sim
|
||||||
run general/parser/col_arithmetic_operation.sim
|
run general/parser/col_arithmetic_operation.sim
|
||||||
run general/parser/columnValue.sim
|
run general/parser/columnValue.sim
|
||||||
#run general/parser/commit.sim
|
run general/parser/commit.sim
|
||||||
run general/parser/create_db.sim
|
run general/parser/create_db.sim
|
||||||
run general/parser/create_mt.sim
|
run general/parser/create_mt.sim
|
||||||
run general/parser/create_tb.sim
|
run general/parser/create_tb.sim
|
||||||
|
@ -106,7 +106,7 @@ run general/parser/first_last.sim
|
||||||
##unsupport run general/parser/import_file.sim
|
##unsupport run general/parser/import_file.sim
|
||||||
run general/parser/lastrow.sim
|
run general/parser/lastrow.sim
|
||||||
run general/parser/nchar.sim
|
run general/parser/nchar.sim
|
||||||
##unsupport run general/parser/null_char.sim
|
run general/parser/null_char.sim
|
||||||
run general/parser/single_row_in_tb.sim
|
run general/parser/single_row_in_tb.sim
|
||||||
run general/parser/select_from_cache_disk.sim
|
run general/parser/select_from_cache_disk.sim
|
||||||
run general/parser/limit.sim
|
run general/parser/limit.sim
|
||||||
|
@ -132,7 +132,7 @@ run general/parser/groupby.sim
|
||||||
run general/parser/bug.sim
|
run general/parser/bug.sim
|
||||||
run general/parser/tags_dynamically_specifiy.sim
|
run general/parser/tags_dynamically_specifiy.sim
|
||||||
run general/parser/set_tag_vals.sim
|
run general/parser/set_tag_vals.sim
|
||||||
##unsupport run general/parser/repeatAlter.sim
|
run general/parser/repeatAlter.sim
|
||||||
##unsupport run general/parser/slimit_alter_tags.sim
|
##unsupport run general/parser/slimit_alter_tags.sim
|
||||||
##unsupport run general/parser/stream_on_sys.sim
|
##unsupport run general/parser/stream_on_sys.sim
|
||||||
run general/parser/stream.sim
|
run general/parser/stream.sim
|
||||||
|
|
Loading…
Reference in New Issue