Merge branch 'develop' into feature/TD-1925_new

This commit is contained in:
Hongze Cheng 2021-01-24 17:59:57 +08:00
commit 63a6fcd74d
46 changed files with 2495 additions and 1854 deletions

View File

@ -133,7 +133,9 @@ cmake .. -G "NMake Makefiles"
nmake
```
If you use the Visual Studio 2019, please open a command window by executing "cmd.exe".
If you use the Visual Studio 2019 or 2017:
please open a command window by executing "cmd.exe".
Please specify "x64" for 64 bits Windows or specify "x86" is for 32 bits Windows when you execute vcvarsall.bat.
```
mkdir debug && cd debug
@ -142,7 +144,7 @@ cmake .. -G "NMake Makefiles"
nmake
```
Or, you can open a command window by clicking Visual Studio 2019 menu "Tools -> Command Line -> Developer Command Prompt" or "Tools -> Command Line -> Developer PowerShell" then execute commands as follows:
Or, you can simply open a command window by clicking Windows Start -> "Visual Studio < 2019 | 2017 >" folder -> "x64 Native Tools Command Prompt for VS < 2019 | 2017 >" or "x86 Native Tools Command Prompt for VS < 2019 | 2017 >" depends what architecture your Windows is, then execute commands as follows:
```
mkdir debug && cd debug
cmake .. -G "NMake Makefiles"

View File

@ -99,14 +99,14 @@ static FORCE_INLINE SQueryInfo* tscGetQueryInfoDetail(SSqlCmd* pCmd, int32_t sub
}
int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOffset, SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock);
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta);
void tscSortRemoveDataBlockDupRows(STableDataBlocks* dataBuf);
SParamInfo* tscAddParamToDataBlock(STableDataBlocks* pDataBlock, char type, uint8_t timePrec, int16_t bytes,
uint32_t offset);
void* tscDestroyBlockArrayList(SArray* pDataBlockList);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable);
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta);
int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock);
int32_t tscMergeTableDataBlocks(SSqlObj* pSql, bool freeBlockMap);

View File

@ -22,15 +22,15 @@ extern "C" {
#include "os.h"
#include "qAggMain.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tglobal.h"
#include "tsqlfunction.h"
#include "tutil.h"
#include "tcache.h"
#include "tglobal.h"
#include "tref.h"
#include "tutil.h"
#include "qExecutor.h"
#include "qSqlparser.h"
@ -223,6 +223,8 @@ typedef struct SQueryInfo {
int32_t udColumnId; // current user-defined constant output field column id, monotonically decreases from TSDB_UD_COLUMN_INDEX
int16_t resColumnId; // result column id
bool distinctTag; // distinct tag or not
} SQueryInfo;
typedef struct {
@ -411,7 +413,7 @@ void tscRestoreSQLFuncForSTableQuery(SQueryInfo *pQueryInfo);
int32_t tscCreateResPointerInfo(SSqlRes *pRes, SQueryInfo *pQueryInfo);
void tscSetResRawPtr(SSqlRes* pRes, SQueryInfo* pQueryInfo);
void tscResetSqlCmdObj(SSqlCmd *pCmd);
void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
/**
* free query result of the sql object

View File

@ -351,7 +351,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pCmd->command == TSDB_SQL_SELECT) {
tscDebug("%p redo parse sql string and proceed", pSql);
pCmd->parseFinished = false;
tscResetSqlCmdObj(pCmd);
tscResetSqlCmd(pCmd, true);
code = tsParseSql(pSql, true);
if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {

View File

@ -16,7 +16,7 @@
#include "tscLocalMerge.h"
#include "tscSubquery.h"
#include "os.h"
#include "qAst.h"
#include "texpr.h"
#include "tlosertree.h"
#include "tscLog.h"
#include "tscUtil.h"
@ -1101,7 +1101,7 @@ static int64_t getNumOfResultLocal(SQueryInfo *pQueryInfo, SQLFunctionCtx *pCtx)
* the number of output result is decided by main output
*/
int32_t functionId = pCtx[j].functionId;
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG) {
continue;
}
@ -1183,7 +1183,7 @@ bool needToMerge(SQueryInfo *pQueryInfo, SLocalMerger *pLocalMerge, tFilePage *t
int16_t functionId = pLocalMerge->pCtx[0].functionId;
// todo opt performance
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0))) { // column projection query
if ((/*functionId == TSDB_FUNC_PRJ || */functionId == TSDB_FUNC_ARITHM) || (tscIsProjectionQueryOnSTable(pQueryInfo, 0) && pQueryInfo->distinctTag == false)) { // column projection query
ret = 1; // disable merge procedure
} else {
tOrderDescriptor *pDesc = pLocalMerge->pDesc;

View File

@ -1036,11 +1036,7 @@ static int32_t validateDataSource(SSqlCmd *pCmd, int8_t type, const char *sql) {
}
/**
* usage: insert into table1 values() () table2 values()()
*
* @param str
* @param acct
* @param db
* parse insert sql
* @param pSql
* @return
*/
@ -1343,10 +1339,11 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
// make a backup as tsParseInsertSql may modify the string
char* sqlstr = strdup(pSql->sqlstr);
ret = tsParseInsertSql(pSql);
if (sqlstr == NULL || pSql->parseRetry >= 1 || ret != TSDB_CODE_TSC_INVALID_SQL) {
if ((sqlstr == NULL) || (pSql->parseRetry >= 1) ||
(ret != TSDB_CODE_TSC_SQL_SYNTAX_ERROR && ret != TSDB_CODE_TSC_INVALID_SQL)) {
free(sqlstr);
} else {
tscResetSqlCmdObj(pCmd);
tscResetSqlCmd(pCmd, true);
free(pSql->sqlstr);
pSql->sqlstr = sqlstr;
pSql->parseRetry++;
@ -1358,7 +1355,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
SSqlInfo SQLInfo = qSQLParse(pSql->sqlstr);
ret = tscToSQLCmd(pSql, &SQLInfo);
if (ret == TSDB_CODE_TSC_INVALID_SQL && pSql->parseRetry == 0 && SQLInfo.type == TSDB_SQL_NULL) {
tscResetSqlCmdObj(pCmd);
tscResetSqlCmd(pCmd, true);
pSql->parseRetry++;
ret = tscToSQLCmd(pSql, &SQLInfo);
}

View File

@ -20,7 +20,7 @@
#include "os.h"
#include "ttype.h"
#include "qAst.h"
#include "texpr.h"
#include "taos.h"
#include "taosmsg.h"
#include "tcompare.h"
@ -41,7 +41,7 @@
#define COLUMN_INDEX_INITIAL_VAL (-3)
#define COLUMN_INDEX_INITIALIZER \
{ COLUMN_INDEX_INITIAL_VAL, COLUMN_INDEX_INITIAL_VAL }
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_TBNAME_COLUMN_INDEX))
#define COLUMN_INDEX_VALIDE(index) (((index).tableIndex >= 0) && ((index).columnIndex >= TSDB_BLOCK_DIST_COLUMN_INDEX))
#define TBNAME_LIST_SEP ","
typedef struct SColumnList { // todo refactor
@ -902,13 +902,17 @@ int32_t parseSlidingClause(SSqlObj* pSql, SQueryInfo* pQueryInfo, SQuerySQL* pQu
int32_t tscSetTableFullName(STableMetaInfo* pTableMetaInfo, SStrToken* pTableName, SSqlObj* pSql) {
const char* msg1 = "name too long";
const char* msg2 = "acctId too long";
SSqlCmd* pCmd = &pSql->cmd;
int32_t code = TSDB_CODE_SUCCESS;
if (hasSpecifyDB(pTableName)) { // db has been specified in sql string so we ignore current db path
tNameSetAcctId(&pTableMetaInfo->name, getAccountId(pSql));
code = tNameSetAcctId(&pTableMetaInfo->name, getAccountId(pSql));
if (code != 0) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
char name[TSDB_TABLE_FNAME_LEN] = {0};
strncpy(name, pTableName->z, pTableName->n);
@ -1354,7 +1358,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
int32_t ret = exprTreeFromSqlExpr(pCmd, &pNode, pItem->pNode, pQueryInfo, colList, NULL);
if (ret != TSDB_CODE_SUCCESS) {
taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL);
tExprTreeDestroy(pNode, NULL);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
@ -1363,9 +1367,9 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
for(int32_t k = 0; k < numOfNode; ++k) {
SColIndex* pIndex = taosArrayGet(colList, k);
if (TSDB_COL_IS_TAG(pIndex->flag)) {
tExprTreeDestroy(&pNode, NULL);
tExprTreeDestroy(pNode, NULL);
taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
}
@ -1392,7 +1396,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
tbufCloseWriter(&bw);
taosArrayDestroy(colList);
tExprTreeDestroy(&pNode, NULL);
tExprTreeDestroy(pNode, NULL);
} else {
columnList.num = 0;
columnList.ids[0] = (SColumnIndex) {0, 0};
@ -1424,7 +1428,7 @@ static int32_t handleArithmeticExpr(SSqlCmd* pCmd, int32_t clauseIndex, int32_t
int32_t ret = exprTreeFromSqlExpr(pCmd, &pArithExprInfo->pExpr, pItem->pNode, pQueryInfo, NULL, &pArithExprInfo->uid);
if (ret != TSDB_CODE_SUCCESS) {
tExprTreeDestroy(&pArithExprInfo->pExpr, NULL);
tExprTreeDestroy(pArithExprInfo->pExpr, NULL);
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), "invalid expression in select clause");
}
@ -1501,23 +1505,39 @@ static void addPrimaryTsColIntoResult(SQueryInfo* pQueryInfo) {
pQueryInfo->type |= TSDB_QUERY_TYPE_PROJECTION_QUERY;
}
bool isValidDistinctSql(SQueryInfo* pQueryInfo) {
if (pQueryInfo == NULL) {
return false;
}
if ((pQueryInfo->type & TSDB_QUERY_TYPE_STABLE_QUERY) != TSDB_QUERY_TYPE_STABLE_QUERY) {
return false;
}
if (tscQueryTags(pQueryInfo) && tscSqlExprNumOfExprs(pQueryInfo) == 1){
return true;
}
return false;
}
int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSelection, bool isSTable, bool joinQuery, bool intervalQuery) {
assert(pSelection != NULL && pCmd != NULL);
const char* msg2 = "functions can not be mixed up";
const char* msg3 = "not support query expression";
const char* msg5 = "invalid function name";
const char* msg6 = "only support distinct one tag";
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(pCmd, clauseIndex);
if (pQueryInfo->colList == NULL) {
pQueryInfo->colList = taosArrayInit(4, POINTER_BYTES);
}
bool hasDistinct = false;
for (int32_t i = 0; i < pSelection->nExpr; ++i) {
int32_t outputIndex = (int32_t)tscSqlExprNumOfExprs(pQueryInfo);
tSqlExprItem* pItem = &pSelection->a[i];
if (hasDistinct == false) {
hasDistinct = (pItem->distinct == true);
}
// project on all fields
int32_t optr = pItem->pNode->nSQLOptr;
@ -1551,6 +1571,13 @@ int32_t parseSelectClause(SSqlCmd* pCmd, int32_t clauseIndex, tSQLExprList* pSel
}
}
if (hasDistinct == true) {
if (!isValidDistinctSql(pQueryInfo)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
pQueryInfo->distinctTag = true;
}
// there is only one user-defined column in the final result field, add the timestamp column.
size_t numOfSrcCols = taosArrayGetSize(pQueryInfo->colList);
if (numOfSrcCols <= 0 && !tscQueryTags(pQueryInfo)) {
@ -1727,6 +1754,9 @@ int32_t addProjectionExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, t
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
SSchema colSchema = tGetTableNameColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_TAGPRJ, &index, &colSchema, TSDB_COL_TAG);
} else if (index.columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema colSchema = tGetBlockDistColumnSchema();
tscAddSpecialColumnForSelect(pQueryInfo, startPos, TSDB_FUNC_PRJ, &index, &colSchema, TSDB_COL_TAG);
} else {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, index.tableIndex);
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
@ -2194,6 +2224,7 @@ int32_t addExprAndResultField(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, int32_t col
if (getColumnIndexByName(pCmd, &pParamElem->pNode->colInfo, pQueryInfo, &index) != TSDB_CODE_SUCCESS) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg3);
}
if (index.columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg6);
}
@ -2402,6 +2433,14 @@ static bool isTablenameToken(SStrToken* token) {
return (strncasecmp(TSQL_TBNAME_L, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_TBNAME_L));
}
static bool isTableBlockDistToken(SStrToken* token) {
SStrToken tmpToken = *token;
SStrToken tableToken = {0};
extractTableNameFromToken(&tmpToken, &tableToken);
return (strncasecmp(TSQL_BLOCK_DIST, tmpToken.z, tmpToken.n) == 0 && tmpToken.n == strlen(TSQL_BLOCK_DIST_L));
}
static int16_t doGetColumnIndex(SQueryInfo* pQueryInfo, int32_t index, SStrToken* pToken) {
STableMeta* pTableMeta = tscGetMetaInfo(pQueryInfo, index)->pTableMeta;
@ -2431,6 +2470,8 @@ int32_t doGetColumnIndexByName(SSqlCmd* pCmd, SStrToken* pToken, SQueryInfo* pQu
if (isTablenameToken(pToken)) {
pIndex->columnIndex = TSDB_TBNAME_COLUMN_INDEX;
} else if (isTableBlockDistToken(pToken)) {
pIndex->columnIndex = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (strncasecmp(pToken->z, DEFAULT_PRIMARY_TIMESTAMP_COL_NAME, pToken->n) == 0) {
pIndex->columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX;
} else {
@ -2671,8 +2712,7 @@ int32_t setShowInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
if (!validateIpAddress(pDnodeIp->z, pDnodeIp->n)) {
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg4);
}
}
}
return TSDB_CODE_SUCCESS;
}
@ -4303,7 +4343,7 @@ static int32_t getTagQueryCondExpr(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SCondE
doCompactQueryExpr(pExpr);
tSqlExprDestroy(p1);
tExprTreeDestroy(&p, NULL);
tExprTreeDestroy(p, NULL);
taosArrayDestroy(colList);
if (pQueryInfo->tagCond.pCond != NULL && taosArrayGetSize(pQueryInfo->tagCond.pCond) > 0 && !UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
@ -4524,10 +4564,10 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg2);
}
size_t size = tscNumOfFields(pQueryInfo);
size_t numOfFields = tscNumOfFields(pQueryInfo);
if (pQueryInfo->fillVal == NULL) {
pQueryInfo->fillVal = calloc(size, sizeof(int64_t));
pQueryInfo->fillVal = calloc(numOfFields, sizeof(int64_t));
if (pQueryInfo->fillVal == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
@ -4537,7 +4577,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
pQueryInfo->fillType = TSDB_FILL_NONE;
} else if (strncasecmp(pItem->pVar.pz, "null", 4) == 0 && pItem->pVar.nLen == 4) {
pQueryInfo->fillType = TSDB_FILL_NULL;
for (int32_t i = START_INTERPO_COL_IDX; i < size; ++i) {
for (int32_t i = START_INTERPO_COL_IDX; i < numOfFields; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
setNull((char*)&pQueryInfo->fillVal[i], pField->type, pField->bytes);
}
@ -4551,7 +4591,7 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
pQueryInfo->fillType = TSDB_FILL_SET_VALUE;
size_t num = taosArrayGetSize(pFillToken);
if (num == 1) {
if (num == 1) { // no actual value, return with error code
return invalidSqlErrMsg(tscGetErrorMsgPayload(pCmd), msg1);
}
@ -4562,11 +4602,11 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
if (tscIsPointInterpQuery(pQueryInfo)) {
startPos = 0;
if (numOfFillVal > size) {
numOfFillVal = (int32_t)size;
if (numOfFillVal > numOfFields) {
numOfFillVal = (int32_t)numOfFields;
}
} else {
numOfFillVal = (int16_t)((num > (int32_t)size) ? (int32_t)size : num);
numOfFillVal = (int16_t)((num > (int32_t)numOfFields) ? (int32_t)numOfFields : num);
}
int32_t j = 1;
@ -4586,10 +4626,10 @@ int32_t parseFillClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQuery
}
}
if ((num < size) || ((num - 1 < size) && (tscIsPointInterpQuery(pQueryInfo)))) {
if ((num < numOfFields) || ((num - 1 < numOfFields) && (tscIsPointInterpQuery(pQueryInfo)))) {
tVariantListItem* lastItem = taosArrayGetLast(pFillToken);
for (int32_t i = numOfFillVal; i < size; ++i) {
for (int32_t i = numOfFillVal; i < numOfFields; ++i) {
TAOS_FIELD* pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i);
if (pField->type == TSDB_DATA_TYPE_BINARY || pField->type == TSDB_DATA_TYPE_NCHAR) {
@ -4640,6 +4680,12 @@ int32_t parseOrderbyClause(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SQuerySQL* pQu
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
if (pQueryInfo->distinctTag == true) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
pQueryInfo->order.orderColId = 0;
return TSDB_CODE_SUCCESS;
}
if (pQuerySql->pSortOrder == NULL) {
return TSDB_CODE_SUCCESS;
}

View File

@ -2228,6 +2228,8 @@ int tscProcessRetrieveRspFromNode(SSqlObj *pSql) {
SSqlRes *pRes = &pSql->res;
SSqlCmd *pCmd = &pSql->cmd;
assert(pRes->rspLen >= sizeof(SRetrieveTableRsp));
SRetrieveTableRsp *pRetrieve = (SRetrieveTableRsp *)pRes->pRsp;
if (pRetrieve == NULL) {
pRes->code = TSDB_CODE_TSC_OUT_OF_MEMORY;

View File

@ -15,7 +15,7 @@
#include "hash.h"
#include "os.h"
#include "qAst.h"
#include "texpr.h"
#include "tkey.h"
#include "tcache.h"
#include "tnote.h"
@ -110,6 +110,7 @@ static SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pa
rpcClose(pDnodeConn);
free(pObj->tscCorMgmtEpSet);
free(pObj);
return NULL;
}
memcpy(pObj->tscCorMgmtEpSet, &corMgmtEpSet, sizeof(SRpcCorEpSet));
@ -936,7 +937,7 @@ int taos_validate_sql(TAOS *taos, const char *sql) {
static int tscParseTblNameList(SSqlObj *pSql, const char *tblNameList, int32_t tblListLen) {
// must before clean the sqlcmd object
tscResetSqlCmdObj(&pSql->cmd);
tscResetSqlCmd(&pSql->cmd, false);
SSqlCmd *pCmd = &pSql->cmd;

View File

@ -191,9 +191,10 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0, 0);
assert(0);
// char* name = pTableMetaInfo->name;
// taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
@ -292,8 +293,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
pStream->stime += 1;
}
// tscDebug("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, pTableMetaInfo->name,
// pStream->numOfRes);
tscDebug("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, tNameGetTableName(&pTableMetaInfo->name),
pStream->numOfRes);
tfree(pTableMetaInfo->pTableMeta);
@ -556,8 +557,8 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
// tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
// pStream, pTableMetaInfo->name, pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
tscDebug("%p stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql,
pStream, tNameGetTableName(&pTableMetaInfo->name), pStream->interval.interval, pStream->interval.sliding, starttime, pSql->sqlstr);
}
void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {

View File

@ -16,7 +16,7 @@
#include "os.h"
#include "qAst.h"
#include "texpr.h"
#include "qTsbuf.h"
#include "tcompare.h"
#include "tscLog.h"
@ -582,13 +582,14 @@ void freeJoinSubqueryObj(SSqlObj* pSql) {
pSql->subState.numOfSub = 0;
}
static void quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
static int32_t quitAllSubquery(SSqlObj* pSqlSub, SSqlObj* pSqlObj, SJoinSupporter* pSupporter) {
if (subAndCheckDone(pSqlSub, pSqlObj, pSupporter->subqueryIndex)) {
tscError("%p all subquery return and query failed, global code:%s", pSqlObj, tstrerror(pSqlObj->res.code));
freeJoinSubqueryObj(pSqlObj);
return;
return 0;
}
return 1;
//tscDestroyJoinSupporter(pSupporter);
}
@ -835,7 +836,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
@ -850,7 +853,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows;
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
@ -867,7 +872,9 @@ static void tidTagRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p failed to malloc memory", pSql);
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
@ -985,7 +992,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
}
tscAsyncResultOnError(pParentSql);
@ -999,7 +1008,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
tscError("%p sub query failed, code:%s, index:%d", pSql, tstrerror(numOfRows), pSupporter->subqueryIndex);
pParentSql->res.code = numOfRows;
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
}
tscAsyncResultOnError(pParentSql);
return;
@ -1014,7 +1025,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
@ -1032,7 +1045,9 @@ static void tsCompRetrieveCallback(void* param, TAOS_RES* tres, int32_t numOfRow
pParentSql->res.code = TAOS_SYSTEM_ERROR(errno);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)){
return;
}
tscAsyncResultOnError(pParentSql);
@ -1129,8 +1144,10 @@ static void joinRetrieveFinalResCallback(void* param, TAOS_RES* tres, int numOfR
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, numOfRows, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
@ -1472,7 +1489,9 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
// retrieve actual query results from vnode during the second stage join subquery
if (pParentSql->res.code != TSDB_CODE_SUCCESS) {
tscError("%p abort query due to other subquery failure. code:%d, global code:%d", pSql, code, pParentSql->res.code);
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
@ -1486,7 +1505,10 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
tscError("%p abort query, code:%s, global code:%s", pSql, tstrerror(code), tstrerror(pParentSql->res.code));
pParentSql->res.code = code;
quitAllSubquery(pSql, pParentSql, pSupporter);
if (quitAllSubquery(pSql, pParentSql, pSupporter)) {
return;
}
tscAsyncResultOnError(pParentSql);
return;
@ -2441,7 +2463,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
pParentObj->cmd.parseFinished = false;
tscResetSqlCmdObj(&pParentObj->cmd);
tscResetSqlCmd(&pParentObj->cmd, false);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.

View File

@ -16,7 +16,7 @@
#include "tscUtil.h"
#include "hash.h"
#include "os.h"
#include "qAst.h"
#include "texpr.h"
#include "taosmsg.h"
#include "tkey.h"
#include "tmd5.h"
@ -381,7 +381,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd) {
tfree(pCmd->pQueryInfo);
}
void tscResetSqlCmdObj(SSqlCmd* pCmd) {
void tscResetSqlCmd(SSqlCmd* pCmd, bool removeMeta) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
@ -399,7 +399,7 @@ void tscResetSqlCmdObj(SSqlCmd* pCmd) {
pCmd->numOfTables = 0;
tfree(pCmd->pTableNameList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, removeMeta);
pCmd->pDataBlocks = tscDestroyBlockArrayList(pCmd->pDataBlocks);
tscFreeQueryInfo(pCmd);
}
@ -501,7 +501,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0;
tscFreeSqlResult(pSql);
tscResetSqlCmdObj(pCmd);
tscResetSqlCmd(pCmd, false);
tfree(pCmd->tagData.data);
pCmd->tagData.dataLen = 0;
@ -515,7 +515,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
free(pSql);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
if (pDataBlock == NULL) {
return;
}
@ -528,6 +528,13 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->pTableMeta);
}
if (removeMeta) {
char name[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pDataBlock->tableName, name);
taosHashRemove(tscTableMetaInfo, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
}
tfree(pDataBlock);
}
@ -563,21 +570,21 @@ void* tscDestroyBlockArrayList(SArray* pDataBlockList) {
size_t size = taosArrayGetSize(pDataBlockList);
for (int32_t i = 0; i < size; i++) {
void* d = taosArrayGetP(pDataBlockList, i);
tscDestroyDataBlock(d);
tscDestroyDataBlock(d, false);
}
taosArrayDestroy(pDataBlockList);
return NULL;
}
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable) {
void* tscDestroyBlockHashTable(SHashObj* pBlockHashTable, bool removeMeta) {
if (pBlockHashTable == NULL) {
return NULL;
}
STableDataBlocks** p = taosHashIterate(pBlockHashTable, NULL);
while(p) {
tscDestroyDataBlock(*p);
tscDestroyDataBlock(*p, removeMeta);
p = taosHashIterate(pBlockHashTable, p);
}
@ -791,7 +798,7 @@ static void extractTableNameList(SSqlCmd* pCmd, bool freeBlockMap) {
}
if (freeBlockMap) {
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList);
pCmd->pTableBlockHashList = tscDestroyBlockHashTable(pCmd->pTableBlockHashList, false);
}
}
@ -1047,7 +1054,7 @@ void tscFieldInfoClear(SFieldInfo* pFieldInfo) {
SInternalField* pInfo = taosArrayGet(pFieldInfo->internalField, i);
if (pInfo->pArithExprInfo != NULL) {
tExprTreeDestroy(&pInfo->pArithExprInfo->pExpr, NULL);
tExprTreeDestroy(pInfo->pArithExprInfo->pExpr, NULL);
SSqlFuncMsg* pFuncMsg = &pInfo->pArithExprInfo->base;
for(int32_t j = 0; j < pFuncMsg->numOfParams; ++j) {
@ -1080,6 +1087,8 @@ static SSqlExpr* doBuildSqlExpr(SQueryInfo* pQueryInfo, int16_t functionId, SCol
// set the correct columnIndex index
if (pColIndex->columnIndex == TSDB_TBNAME_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_TBNAME_COLUMN_INDEX;
} else if (pColIndex->columnIndex == TSDB_BLOCK_DIST_COLUMN_INDEX) {
pExpr->colInfo.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
} else if (pColIndex->columnIndex <= TSDB_UD_COLUMN_INDEX) {
pExpr->colInfo.colId = pColIndex->columnIndex;
} else {
@ -1496,7 +1505,7 @@ bool tscValidateColumnId(STableMetaInfo* pTableMetaInfo, int32_t colId, int32_t
return false;
}
if (colId == TSDB_TBNAME_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
if (colId == TSDB_TBNAME_COLUMN_INDEX || colId == TSDB_BLOCK_DIST_COLUMN_INDEX || (colId <= TSDB_UD_COLUMN_INDEX && numOfParams == 2)) {
return true;
}

View File

@ -31,6 +31,15 @@ extern "C" {
struct tExprNode;
struct SSchema;
#define QUERY_COND_REL_PREFIX_IN "IN|"
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
#define QUERY_COND_REL_PREFIX_IN_LEN 3
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
typedef bool (*__result_filter_fn_t)(const void *, void *);
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
enum {
TSQL_NODE_DUMMY = 0x0,
TSQL_NODE_EXPR = 0x1,
@ -38,9 +47,6 @@ enum {
TSQL_NODE_VALUE = 0x4,
};
typedef bool (*__result_filter_fn_t)(const void *, void *);
typedef void (*__do_filter_suppl_fn_t)(void *, void *);
/**
* this structure is used to filter data in tags, so the offset of filtered tag column in tagdata string is required
*/
@ -52,12 +58,6 @@ typedef struct tQueryInfo {
bool indexed; // indexed columns
} tQueryInfo;
typedef struct SExprTraverseSupp {
__result_filter_fn_t nodeFilterFn;
__do_filter_suppl_fn_t setupInfoFn;
void * pExtInfo;
} SExprTraverseSupp;
typedef struct tExprNode {
uint8_t nodeType;
union {
@ -65,7 +65,7 @@ typedef struct tExprNode {
uint8_t optr; // filter operator
uint8_t hasPK; // 0: do not contain primary filter, 1: contain
void * info; // support filter operation on this expression only available for leaf node
struct tExprNode *pLeft; // left child pointer
struct tExprNode *pRight; // right child pointer
} _node;
@ -74,19 +74,27 @@ typedef struct tExprNode {
};
} tExprNode;
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
typedef struct SExprTraverseSupp {
__result_filter_fn_t nodeFilterFn;
__do_filter_suppl_fn_t setupInfoFn;
void * pExtInfo;
} SExprTraverseSupp;
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
tExprNode* exprTreeFromBinary(const void* data, size_t size);
tExprNode* exprTreeFromTableName(const char* tbnameCond);
void exprTreeToBinary(SBufferWriter* bw, tExprNode* pExprTree);
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *));
void tExprTreeDestroy(tExprNode **pExprs, void (*fp)(void*));
bool exprTreeApplayFilter(tExprNode *pExpr, const void *pItem, SExprTraverseSupp *param);
typedef void (*_arithmetic_operator_fn_t)(void *left, int32_t numLeft, int32_t leftType, void *right, int32_t numRight,
int32_t rightType, void *output, int32_t order);
void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput, void *param, int32_t order,
char *(*cb)(void *, const char*, int32_t));
#ifdef __cplusplus
}
#endif

View File

@ -1,3 +1,18 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_NAME_H
#define TDENGINE_NAME_H
@ -45,6 +60,8 @@ void extractTableNameFromToken(SStrToken *pToken, SStrToken* pTable);
SSchema tGetTableNameColumnSchema();
SSchema tGetBlockDistColumnSchema();
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name);
bool tscValidateTableNameLength(size_t len);

View File

@ -15,9 +15,9 @@
#include "os.h"
#include "qArithmeticOperator.h"
#include "ttype.h"
#include "tutil.h"
#include "tarithoperator.h"
#define ARRAY_LIST_OP(left, right, _left_type, _right_type, len1, len2, out, op, _res_type, _ord) \
{ \

View File

@ -16,18 +16,15 @@
#include "os.h"
#include "exception.h"
#include "qArithmeticOperator.h"
#include "qAst.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tname.h"
#include "tschemautil.h"
#include "tsdb.h"
#include "tskiplist.h"
#include "tsqlfunction.h"
#include "texpr.h"
#include "tarithoperator.h"
static uint8_t UNUSED_FUNC isQueryOnPrimaryKey(const char *primaryColumnName, const tExprNode *pLeft, const tExprNode *pRight) {
if (pLeft->nodeType == TSQL_NODE_COL) {
@ -102,13 +99,15 @@ static void reverseCopy(char* dest, const char* src, int16_t type, int32_t numOf
}
}
void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *));
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *)) {
if (pNode == NULL) {
return;
}
if (pNode->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&pNode, fp);
doExprTreeDestroy(&pNode, fp);
} else if (pNode->nodeType == TSQL_NODE_VALUE) {
tVariantDestroy(pNode->pVal);
} else if (pNode->nodeType == TSQL_NODE_COL) {
@ -118,14 +117,14 @@ void tExprNodeDestroy(tExprNode *pNode, void (*fp)(void *)) {
free(pNode);
}
void tExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
static void doExprTreeDestroy(tExprNode **pExpr, void (*fp)(void *)) {
if (*pExpr == NULL) {
return;
}
if ((*pExpr)->nodeType == TSQL_NODE_EXPR) {
tExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
tExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
doExprTreeDestroy(&(*pExpr)->_node.pLeft, fp);
doExprTreeDestroy(&(*pExpr)->_node.pRight, fp);
if (fp != NULL) {
fp((*pExpr)->_node.info);
@ -270,8 +269,9 @@ void arithmeticTreeTraverse(tExprNode *pExprs, int32_t numOfRows, char *pOutput,
}
}
free(pLeftOutput);
free(pRightOutput);
tfree(pdata);
tfree(pLeftOutput);
tfree(pRightOutput);
}
static void exprTreeToBinaryImpl(SBufferWriter* bw, tExprNode* expr) {
@ -342,7 +342,7 @@ static tExprNode* exprTreeFromBinaryImpl(SBufferReader* br) {
}
tExprNode* pExpr = exception_calloc(1, sizeof(tExprNode));
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, pExpr, NULL);
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, pExpr, NULL);
pExpr->nodeType = tbufReadUint8(br);
if (pExpr->nodeType == TSQL_NODE_VALUE) {
@ -396,7 +396,7 @@ tExprNode* exprTreeFromTableName(const char* tbnameCond) {
int32_t anchor = CLEANUP_GET_ANCHOR();
tExprNode* expr = exception_calloc(1, sizeof(tExprNode));
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL);
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
expr->nodeType = TSQL_NODE_EXPR;

View File

@ -39,6 +39,14 @@ SSchema tGetTableNameColumnSchema() {
tstrncpy(s.name, TSQL_TBNAME_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetBlockDistColumnSchema() {
SSchema s = {0};
s.bytes = TSDB_MAX_BINARY_LEN;;
s.type = TSDB_DATA_TYPE_BINARY;
s.colId = TSDB_BLOCK_DIST_COLUMN_INDEX;
tstrncpy(s.name, TSQL_BLOCK_DIST_L, TSDB_COL_NAME_LEN);
return s;
}
SSchema tGetUserSpecifiedColumnSchema(tVariant* pVal, SStrToken* exprStr, const char* name) {
SSchema s = {0};
@ -367,6 +375,9 @@ int32_t tNameSetAcctId(SName* dst, const char* acct) {
}
tstrncpy(dst->acctId, acct, tListLen(dst->acctId));
assert(strlen(dst->acctId) > 0);
return 0;
}
@ -383,12 +394,14 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
int32_t len = (int32_t)(p - str);
// too long account id or too long db name
if (len >= tListLen(dst->acctId) || len == 0) {
if ((len >= tListLen(dst->acctId)) || (len <= 0)) {
return -1;
}
memcpy (dst->acctId, str, len);
dst->acctId[len] = 0;
assert(strlen(dst->acctId) > 0);
}
if ((type & T_NAME_DB) == T_NAME_DB) {
@ -404,7 +417,7 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
}
// too long account id or too long db name
if (len >= tListLen(dst->dbname) || len == 0) {
if ((len >= tListLen(dst->dbname)) || (len <= 0)) {
return -1;
}
@ -419,7 +432,7 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
int32_t len = (int32_t) strlen(start);
// too long account id or too long db name
if (len >= tListLen(dst->tname) || len == 0) {
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
}

View File

@ -235,7 +235,9 @@ do { \
#define TSDB_MAX_REPLICA 5
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_BLOCK_DIST_COLUMN_INDEX (-2)
#define TSDB_UD_COLUMN_INDEX (-100)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta
#define TSDB_MIN_CACHE_BLOCK_SIZE 1

View File

@ -113,7 +113,9 @@ void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type, int16_t bytes);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId *)(_table))
#define TSDB_TABLEID(_table) ((STableId*) (_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
@ -140,7 +142,6 @@ typedef struct {
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
STableInfo *tsdbGetTableInfo(STsdbRepo *pRepo, STableId tid);
// -- FOR INSERT DATA
/**
@ -159,9 +160,10 @@ typedef void *TsdbQueryHandleT; // Use void to hide implementation details
// query condition to build vnode iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
} STsdbQueryCond;
typedef struct SMemRef {
@ -233,13 +235,32 @@ SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
void *qinfo, SMemRef *pRef);
/**
* move to next block if exists
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT *pQueryHandle);
/**
* move to next block if exists but not merge data in memtable
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT *pQueryHandle);
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type);
/**
* Get current data block information

View File

@ -16,6 +16,7 @@
#ifndef TDENGINE_TTOKENDEF_H
#define TDENGINE_TTOKENDEF_H
#define TK_ID 1
#define TK_BOOL 2
#define TK_TINYINT 3
@ -127,104 +128,104 @@
#define TK_SELECT 109
#define TK_UNION 110
#define TK_ALL 111
#define TK_FROM 112
#define TK_VARIABLE 113
#define TK_INTERVAL 114
#define TK_FILL 115
#define TK_SLIDING 116
#define TK_ORDER 117
#define TK_BY 118
#define TK_ASC 119
#define TK_DESC 120
#define TK_GROUP 121
#define TK_HAVING 122
#define TK_LIMIT 123
#define TK_OFFSET 124
#define TK_SLIMIT 125
#define TK_SOFFSET 126
#define TK_WHERE 127
#define TK_NOW 128
#define TK_RESET 129
#define TK_QUERY 130
#define TK_ADD 131
#define TK_COLUMN 132
#define TK_TAG 133
#define TK_CHANGE 134
#define TK_SET 135
#define TK_KILL 136
#define TK_CONNECTION 137
#define TK_STREAM 138
#define TK_COLON 139
#define TK_ABORT 140
#define TK_AFTER 141
#define TK_ATTACH 142
#define TK_BEFORE 143
#define TK_BEGIN 144
#define TK_CASCADE 145
#define TK_CLUSTER 146
#define TK_CONFLICT 147
#define TK_COPY 148
#define TK_DEFERRED 149
#define TK_DELIMITERS 150
#define TK_DETACH 151
#define TK_EACH 152
#define TK_END 153
#define TK_EXPLAIN 154
#define TK_FAIL 155
#define TK_FOR 156
#define TK_IGNORE 157
#define TK_IMMEDIATE 158
#define TK_INITIALLY 159
#define TK_INSTEAD 160
#define TK_MATCH 161
#define TK_KEY 162
#define TK_OF 163
#define TK_RAISE 164
#define TK_REPLACE 165
#define TK_RESTRICT 166
#define TK_ROW 167
#define TK_STATEMENT 168
#define TK_TRIGGER 169
#define TK_VIEW 170
#define TK_COUNT 171
#define TK_SUM 172
#define TK_AVG 173
#define TK_MIN 174
#define TK_MAX 175
#define TK_FIRST 176
#define TK_LAST 177
#define TK_TOP 178
#define TK_BOTTOM 179
#define TK_STDDEV 180
#define TK_PERCENTILE 181
#define TK_APERCENTILE 182
#define TK_LEASTSQUARES 183
#define TK_HISTOGRAM 184
#define TK_DIFF 185
#define TK_SPREAD 186
#define TK_TWA 187
#define TK_INTERP 188
#define TK_LAST_ROW 189
#define TK_RATE 190
#define TK_IRATE 191
#define TK_SUM_RATE 192
#define TK_SUM_IRATE 193
#define TK_AVG_RATE 194
#define TK_AVG_IRATE 195
#define TK_TBID 196
#define TK_SEMI 197
#define TK_NONE 198
#define TK_PREV 199
#define TK_LINEAR 200
#define TK_IMPORT 201
#define TK_METRIC 202
#define TK_TBNAME 203
#define TK_JOIN 204
#define TK_METRICS 205
#define TK_INSERT 206
#define TK_INTO 207
#define TK_VALUES 208
#define TK_DISTINCT 112
#define TK_FROM 113
#define TK_VARIABLE 114
#define TK_INTERVAL 115
#define TK_FILL 116
#define TK_SLIDING 117
#define TK_ORDER 118
#define TK_BY 119
#define TK_ASC 120
#define TK_DESC 121
#define TK_GROUP 122
#define TK_HAVING 123
#define TK_LIMIT 124
#define TK_OFFSET 125
#define TK_SLIMIT 126
#define TK_SOFFSET 127
#define TK_WHERE 128
#define TK_NOW 129
#define TK_RESET 130
#define TK_QUERY 131
#define TK_ADD 132
#define TK_COLUMN 133
#define TK_TAG 134
#define TK_CHANGE 135
#define TK_SET 136
#define TK_KILL 137
#define TK_CONNECTION 138
#define TK_STREAM 139
#define TK_COLON 140
#define TK_ABORT 141
#define TK_AFTER 142
#define TK_ATTACH 143
#define TK_BEFORE 144
#define TK_BEGIN 145
#define TK_CASCADE 146
#define TK_CLUSTER 147
#define TK_CONFLICT 148
#define TK_COPY 149
#define TK_DEFERRED 150
#define TK_DELIMITERS 151
#define TK_DETACH 152
#define TK_EACH 153
#define TK_END 154
#define TK_EXPLAIN 155
#define TK_FAIL 156
#define TK_FOR 157
#define TK_IGNORE 158
#define TK_IMMEDIATE 159
#define TK_INITIALLY 160
#define TK_INSTEAD 161
#define TK_MATCH 162
#define TK_KEY 163
#define TK_OF 164
#define TK_RAISE 165
#define TK_REPLACE 166
#define TK_RESTRICT 167
#define TK_ROW 168
#define TK_STATEMENT 169
#define TK_TRIGGER 170
#define TK_VIEW 171
#define TK_COUNT 172
#define TK_SUM 173
#define TK_AVG 174
#define TK_MIN 175
#define TK_MAX 176
#define TK_FIRST 177
#define TK_LAST 178
#define TK_TOP 179
#define TK_BOTTOM 180
#define TK_STDDEV 181
#define TK_PERCENTILE 182
#define TK_APERCENTILE 183
#define TK_LEASTSQUARES 184
#define TK_HISTOGRAM 185
#define TK_DIFF 186
#define TK_SPREAD 187
#define TK_TWA 188
#define TK_INTERP 189
#define TK_LAST_ROW 190
#define TK_RATE 191
#define TK_IRATE 192
#define TK_SUM_RATE 193
#define TK_SUM_IRATE 194
#define TK_AVG_RATE 195
#define TK_AVG_IRATE 196
#define TK_TBID 197
#define TK_SEMI 198
#define TK_NONE 199
#define TK_PREV 200
#define TK_LINEAR 201
#define TK_IMPORT 202
#define TK_METRIC 203
#define TK_TBNAME 204
#define TK_JOIN 205
#define TK_METRICS 206
#define TK_INSERT 207
#define TK_INTO 208
#define TK_VALUES 209
#define TK_SPACE 300

View File

@ -45,6 +45,7 @@ typedef struct tstr {
case TSDB_DATA_TYPE_USMALLINT: \
(_v) = (_finalType)GET_UINT16_VAL(_data); \
break; \
case TSDB_DATA_TYPE_TIMESTAMP:\
case TSDB_DATA_TYPE_BIGINT: \
(_v) = (_finalType)(GET_INT64_VAL(_data)); \
break; \
@ -66,6 +67,43 @@ typedef struct tstr {
} \
} while (0)
#define SET_TYPED_DATA(_v, _type, _data) \
do { \
switch (_type) { \
case TSDB_DATA_TYPE_BOOL: \
case TSDB_DATA_TYPE_TINYINT: \
*(int8_t *)(_v) = (int8_t)(_data); \
break; \
case TSDB_DATA_TYPE_UTINYINT: \
*(uint8_t *)(_v) = (uint8_t)(_data); \
break; \
case TSDB_DATA_TYPE_SMALLINT: \
*(int16_t *)(_v) = (int16_t)(_data); \
break; \
case TSDB_DATA_TYPE_USMALLINT: \
*(uint16_t *)(_v) = (uint16_t)(_data); \
break; \
case TSDB_DATA_TYPE_BIGINT: \
*(int64_t *)(_v) = (int64_t)(_data); \
break; \
case TSDB_DATA_TYPE_UBIGINT: \
*(uint64_t *)(_v) = (uint64_t)(_data); \
break; \
case TSDB_DATA_TYPE_FLOAT: \
*(float *)(_v) = (float)(_data); \
break; \
case TSDB_DATA_TYPE_DOUBLE: \
*(double *)(_v) = (double)(_data); \
break; \
case TSDB_DATA_TYPE_UINT: \
*(uint32_t *)(_v) = (uint32_t)(_data); \
break; \
default: \
*(int32_t *)(_v) = (int32_t)(_data); \
break; \
} \
} while (0)
#define IS_SIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_TINYINT && (_t) <= TSDB_DATA_TYPE_BIGINT)
#define IS_UNSIGNED_NUMERIC_TYPE(_t) ((_t) >= TSDB_DATA_TYPE_UTINYINT && (_t) <= TSDB_DATA_TYPE_UBIGINT)
#define IS_FLOAT_TYPE(_t) ((_t) == TSDB_DATA_TYPE_FLOAT || (_t) == TSDB_DATA_TYPE_DOUBLE)

View File

@ -43,7 +43,7 @@
*/
int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
const unsigned int day, const unsigned int hour,
const unsigned int min, const unsigned int sec)
const unsigned int min, const unsigned int sec, int64_t timezone)
{
unsigned int mon = mon0, year = year0;
@ -61,12 +61,6 @@ int64_t user_mktime64(const unsigned int year0, const unsigned int mon0,
res = res*24;
res = ((res + hour) * 60 + min) * 60 + sec;
#ifdef _MSC_VER
#if _MSC_VER >= 1900
int64_t timezone = _timezone;
#endif
#endif
return (res + timezone);
}
@ -219,7 +213,7 @@ int32_t parseTimeWithTz(char* timestr, int64_t* time, int32_t timePrec) {
/* mktime will be affected by TZ, set by using taos_options */
#ifdef WINDOWS
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, 0);
//int64_t seconds = gmtime(&tm);
#else
int64_t seconds = timegm(&tm);
@ -276,7 +270,13 @@ int32_t parseLocaltime(char* timestr, int64_t* time, int32_t timePrec) {
return -1;
}
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
#ifdef _MSC_VER
#if _MSC_VER >= 1900
int64_t timezone = _timezone;
#endif
#endif
int64_t seconds = user_mktime64(tm.tm_year+1900, tm.tm_mon+1, tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, timezone);
int64_t fraction = 0;
@ -574,4 +574,4 @@ const char* fmtts(int64_t ts) {
}
return buf;
}
}

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSQLFUNCTION_H
#define TDENGINE_TSQLFUNCTION_H
#ifndef TDENGINE_QAGGMAIN_H
#define TDENGINE_QAGGMAIN_H
#ifdef __cplusplus
extern "C" {
@ -97,11 +97,7 @@ extern "C" {
#define DATA_SET_FLAG ',' // to denote the output area has data, not null value
#define DATA_SET_FLAG_SIZE sizeof(DATA_SET_FLAG)
#define QUERY_COND_REL_PREFIX_IN "IN|"
#define QUERY_COND_REL_PREFIX_LIKE "LIKE|"
#define QUERY_COND_REL_PREFIX_IN_LEN 3
#define QUERY_COND_REL_PREFIX_LIKE_LEN 5
#define QUERY_ASC_FORWARD_STEP 1
#define QUERY_DESC_FORWARD_STEP -1
@ -153,7 +149,7 @@ typedef struct SResultRowCellInfo {
typedef struct SPoint1 {
int64_t key;
double val;
union{double val; char* ptr;};
} SPoint1;
#define GET_ROWCELL_INTERBUF(_c) ((void*) ((char*)(_c) + sizeof(SResultRowCellInfo)))
@ -279,4 +275,4 @@ static FORCE_INLINE void initResultInfo(SResultRowCellInfo *pResInfo, uint32_t b
}
#endif
#endif // TDENGINE_TSQLFUNCTION_H
#endif // TDENGINE_QAGGMAIN_H

View File

@ -18,6 +18,7 @@
#include "os.h"
#include "hash.h"
#include "qAggMain.h"
#include "qFill.h"
#include "qResultbuf.h"
#include "qSqlparser.h"
@ -27,7 +28,6 @@
#include "tarray.h"
#include "tlockfree.h"
#include "tsdb.h"
#include "tsqlfunction.h"
struct SColumnFilterElem;
typedef bool (*__filter_func_t)(struct SColumnFilterElem* pFilter, const char* val1, const char* val2, int16_t type);
@ -164,13 +164,14 @@ typedef struct SQuery {
SColumnInfo* tagColList;
int32_t numOfFilterCols;
int64_t* fillVal;
uint32_t status; // query status
uint32_t status; // query status
SResultRec rec;
int32_t pos;
tFilePage** sdata;
STableQueryInfo* current;
int32_t numOfCheckedBlocks; // number of check data blocks
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
SSingleColumnFilterInfo* pFilterInfo;
} SQuery;
@ -194,6 +195,7 @@ typedef struct SQueryRuntimeEnv {
bool hasTagResults; // if there are tag values in final result or not
bool timeWindowInterpo;// if the time window start/end required interpolation
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
bool queryBlockDist; // if query data block distribution
int32_t interBufSize; // intermediate buffer sizse
int32_t prevGroupId; // previous executed group id
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file

View File

@ -86,7 +86,7 @@ bool taosFillHasMoreResults(SFillInfo* pFillInfo);
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, int64_t ekey, int32_t maxNumOfRows);
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint *point1, SPoint *point2, SPoint *point);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType);
int64_t taosFillResultDataBlock(SFillInfo* pFillInfo, tFilePage** output, int32_t capacity);

View File

@ -200,6 +200,7 @@ typedef struct tSQLExpr {
typedef struct tSqlExprItem {
tSQLExpr *pNode; // The list of expressions
char * aliasName; // alias name, null-terminated string
bool distinct;
} tSqlExprItem;
// todo refactor by using SArray
@ -232,7 +233,7 @@ tSQLExpr *tSqlExprCreate(tSQLExpr *pLeft, tSQLExpr *pRight, int32_t optrType);
void tSqlExprDestroy(tSQLExpr *pExpr);
tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken *pToken);
tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken *pDistinct, SStrToken *pToken);
void tSqlExprListDestroy(tSQLExprList *pList);

View File

@ -161,7 +161,7 @@ cmd ::= ALTER DNODE ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL
cmd ::= ALTER DNODE ids(X) ids(Y) ids(Z). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_DNODE, 3, &X, &Y, &Z); }
cmd ::= ALTER LOCAL ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 1, &X); }
cmd ::= ALTER LOCAL ids(X) ids(Y). { setDCLSQLElems(pInfo, TSDB_SQL_CFG_LOCAL, 2, &X, &Y); }
cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SStrToken t = {0}; setCreateDBSQL(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
cmd ::= ALTER DATABASE ids(X) alter_db_optr(Y). { SStrToken t = {0}; setCreateDbInfo(pInfo, TSDB_SQL_ALTER_DB, &X, &Y, &t);}
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
@ -186,7 +186,7 @@ ifnotexists(X) ::= . { X.n = 0;}
cmd ::= CREATE DNODE ids(X). { setDCLSQLElems(pInfo, TSDB_SQL_CREATE_DNODE, 1, &X);}
cmd ::= CREATE ACCOUNT ids(X) PASS ids(Y) acct_optr(Z).
{ setCreateAcctSql(pInfo, TSDB_SQL_CREATE_ACCT, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDBSQL(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE DATABASE ifnotexists(Z) ids(X) db_optr(Y). { setCreateDbInfo(pInfo, TSDB_SQL_CREATE_DB, &X, &Y, &Z);}
cmd ::= CREATE USER ids(X) PASS ids(Y). { setCreateUserSql(pInfo, &X, &Y);}
pps(Y) ::= . { Y.n = 0; }
@ -457,13 +457,13 @@ select(A) ::= SELECT(T) selcollist(W). {
%destructor sclp {tSqlExprListDestroy($$);}
sclp(A) ::= selcollist(X) COMMA. {A = X;}
sclp(A) ::= . {A = 0;}
selcollist(A) ::= sclp(P) expr(X) as(Y). {
A = tSqlExprListAppend(P, X, Y.n?&Y:0);
selcollist(A) ::= sclp(P) distinct(Z) expr(X) as(Y). {
A = tSqlExprListAppend(P, X, Z.n? &Z:0, Y.n?&Y:0);
}
selcollist(A) ::= sclp(P) STAR. {
tSQLExpr *pNode = tSqlExprIdValueCreate(NULL, TK_ALL);
A = tSqlExprListAppend(P, pNode, 0);
A = tSqlExprListAppend(P, pNode, 0, 0);
}
// An option "AS <id>" phrase that can follow one of the expressions that
@ -474,6 +474,10 @@ as(X) ::= AS ids(Y). { X = Y; }
as(X) ::= ids(Y). { X = Y; }
as(X) ::= . { X.n = 0; }
%type distinct {SStrToken}
distinct(X) ::= DISTINCT(Y). { X = Y; }
distinct(X) ::= . { X.n = 0;}
// A complete FROM clause.
%type from {SArray*}
// current not support query from no-table
@ -681,8 +685,8 @@ expr(A) ::= expr(X) IN LP exprlist(Y) RP. {A = tSqlExprCreate(X, (tSQLExpr*)Y,
%type expritem {tSQLExpr*}
%destructor expritem {tSqlExprDestroy($$);}
exprlist(A) ::= exprlist(X) COMMA expritem(Y). {A = tSqlExprListAppend(X,Y,0);}
exprlist(A) ::= expritem(X). {A = tSqlExprListAppend(0,X,0);}
exprlist(A) ::= exprlist(X) COMMA expritem(Y). {A = tSqlExprListAppend(X,Y,0, 0);}
exprlist(A) ::= expritem(X). {A = tSqlExprListAppend(0,X,0, 0);}
expritem(A) ::= expr(X). {A = X;}
expritem(A) ::= . {A = 0;}

View File

@ -14,17 +14,17 @@
*/
#include "os.h"
#include "qAst.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "texpr.h"
#include "ttype.h"
#include "qAggMain.h"
#include "qFill.h"
#include "qHistogram.h"
#include "qPercentile.h"
#include "qTsbuf.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "queryLog.h"
#include "tscSubquery.h"
#include "tsqlfunction.h"
#include "ttype.h"
#define GET_INPUT_DATA_LIST(x) (((char *)((x)->aInputElemBuf)) + ((x)->startOffset) * ((x)->inputBytes))
#define GET_INPUT_DATA(x, y) (GET_INPUT_DATA_LIST(x) + (y) * (x)->inputBytes)
@ -3776,89 +3776,67 @@ void twa_function_finalizer(SQLFunctionCtx *pCtx) {
*
* @param pCtx
*/
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
SInterpInfoDetail* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
assert(pCtx->startOffset == 0);
static void interp_function_impl(SQLFunctionCtx *pCtx) {
int32_t type = (int32_t) pCtx->param[2].i64;
if (type == TSDB_FILL_NONE) {
return;
}
if (pCtx->size == 1) {
char *pData = GET_INPUT_DATA_LIST(pCtx);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
if (pCtx->inputType == TSDB_DATA_TYPE_TIMESTAMP) {
*(TSKEY *) pCtx->aOutputBuf = pCtx->nStartQueryTimestamp;
} else {
/*
* use interpolation to generate the result.
* Note: the result of primary timestamp column uses the timestamp specified by user in the query sql
*/
assert(pCtx->size == 2);
if (pInfo->type == TSDB_FILL_NONE) { // set no output result
if (pCtx->start.key == INT64_MIN) {
assert(pCtx->end.key == INT64_MIN);
return;
}
if (pInfo->primaryCol == 1) {
*(TSKEY *) pCtx->aOutputBuf = pInfo->ts;
} else {
if (pInfo->type == TSDB_FILL_NULL) {
if (pCtx->outputType == TSDB_DATA_TYPE_BINARY || pCtx->outputType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->outputType);
if (type == TSDB_FILL_NULL) {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
} else if (type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType, true);
} else if (type == TSDB_FILL_PREV) {
if (IS_NUMERIC_TYPE(pCtx->inputType) || pCtx->inputType == TSDB_DATA_TYPE_BOOL) {
SET_TYPED_DATA(pCtx->aOutputBuf, pCtx->inputType, pCtx->start.val);
} else {
assignVal(pCtx->aOutputBuf, pCtx->start.ptr, pCtx->outputBytes, pCtx->inputType);
}
} else if (type == TSDB_FILL_LINEAR) {
SPoint point1 = {.key = pCtx->start.key, .val = &pCtx->start.val};
SPoint point2 = {.key = pCtx->end.key, .val = &pCtx->end.val};
SPoint point = {.key = pCtx->nStartQueryTimestamp, .val = pCtx->aOutputBuf};
int32_t srcType = pCtx->inputType;
if (IS_NUMERIC_TYPE(srcType)) { // TODO should find the not null data?
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
setNull(pCtx->aOutputBuf, pCtx->outputType, pCtx->outputBytes);
}
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_SET_VALUE) {
tVariantDump(&pCtx->param[1], pCtx->aOutputBuf, pCtx->inputType, true);
} else if (pInfo->type == TSDB_FILL_PREV) {
char *data = GET_INPUT_DATA(pCtx, 0);
assignVal(pCtx->aOutputBuf, data, pCtx->outputBytes, pCtx->outputType);
SET_VAL(pCtx, pCtx->size, 1);
} else if (pInfo->type == TSDB_FILL_LINEAR) {
char *data1 = GET_INPUT_DATA(pCtx, 0);
char *data2 = GET_INPUT_DATA(pCtx, 1);
TSKEY key1 = pCtx->ptsList[0];
TSKEY key2 = pCtx->ptsList[1];
SPoint point1 = {.key = key1, .val = data1};
SPoint point2 = {.key = key2, .val = data2};
SPoint point = {.key = pInfo->ts, .val = pCtx->aOutputBuf};
int32_t srcType = pCtx->inputType;
if ((srcType >= TSDB_DATA_TYPE_TINYINT && srcType <= TSDB_DATA_TYPE_BIGINT) ||
srcType == TSDB_DATA_TYPE_TIMESTAMP || srcType == TSDB_DATA_TYPE_DOUBLE) {
point1.val = data1;
point2.val = data2;
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
}
} else if (srcType == TSDB_DATA_TYPE_FLOAT) {
point1.val = data1;
point2.val = data2;
if (isNull(data1, srcType) || isNull(data2, srcType)) {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
} else {
taosGetLinearInterpolationVal(pCtx->outputType, &point1, &point2, &point);
}
} else {
if (srcType == TSDB_DATA_TYPE_BINARY || srcType == TSDB_DATA_TYPE_NCHAR) {
setVardataNull(pCtx->aOutputBuf, pCtx->inputType);
} else {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
}
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
}
} else {
setNull(pCtx->aOutputBuf, srcType, pCtx->inputBytes);
}
}
}
SET_VAL(pCtx, pCtx->size, 1);
SET_VAL(pCtx, 1, 1);
}
static void interp_function(SQLFunctionCtx *pCtx) {
// at this point, the value is existed, return directly
if (pCtx->size > 0) {
// impose the timestamp check
TSKEY key = GET_TS_DATA(pCtx, 0);
if (key == pCtx->nStartQueryTimestamp) {
char *pData = GET_INPUT_DATA(pCtx, 0);
assignVal(pCtx->aOutputBuf, pData, pCtx->inputBytes, pCtx->inputType);
SET_VAL(pCtx, 1, 1);
} else {
interp_function_impl(pCtx);
}
} else { //no qualified data rows and interpolation is required
interp_function_impl(pCtx);
}
}
static bool ts_comp_function_setup(SQLFunctionCtx *pCtx) {

View File

@ -20,7 +20,7 @@
#include "exception.h"
#include "hash.h"
#include "qAst.h"
#include "texpr.h"
#include "qExecutor.h"
#include "qResultbuf.h"
#include "qUtil.h"
@ -28,6 +28,7 @@
#include "queryLog.h"
#include "tlosertree.h"
#include "ttype.h"
#include "tcompare.h"
#define MAX_ROWS_PER_RESBUF_PAGE ((1u<<12) - 1)
@ -90,6 +91,13 @@ typedef struct {
STSCursor cur;
} SQueryStatusInfo;
typedef struct {
SArray *dataBlockInfos;
int64_t firstSeekTimeUs;
int64_t numOfRowsInMemTable;
char *result;
} STableBlockDist;
#if 0
static UNUSED_FUNC void *u_malloc (size_t __size) {
uint32_t v = rand();
@ -408,7 +416,7 @@ static bool isTopBottomQuery(SQuery *pQuery) {
static bool timeWindowInterpoRequired(SQuery *pQuery) {
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
int32_t functionId = pQuery->pExpr1[i].base.functionId;
if (functionId == TSDB_FUNC_TWA) {
if (functionId == TSDB_FUNC_TWA || functionId == TSDB_FUNC_INTERP) {
return true;
}
}
@ -818,6 +826,7 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SDataBlockInfo *pDataBlo
return num;
}
// TODO decouple the data block and the SQLFunctionCtx
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pWin, int32_t offset, int32_t forwardStep, TSKEY *tsCol, int32_t numOfTotal) {
SQuery *pQuery = pRuntimeEnv->pQuery;
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
@ -825,8 +834,8 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
bool hasPrev = pCtx[0].preAggVals.isSet;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].size = forwardStep;
pCtx[k].nStartQueryTimestamp = pWin->skey;
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? offset : offset - (forwardStep - 1);
int32_t functionId = pQuery->pExpr1[k].base.functionId;
@ -1029,7 +1038,8 @@ static void setNotInterpoWindowKey(SQLFunctionCtx* pCtx, int32_t numOfOutput, in
}
// window start key interpolation
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows, SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win) {
static bool setTimeWindowInterpolationStartTs(SQueryRuntimeEnv* pRuntimeEnv, int32_t pos, int32_t numOfRows,
SArray* pDataBlock, TSKEY* tsCols, STimeWindow* win, int16_t type) {
SQuery* pQuery = pRuntimeEnv->pQuery;
TSKEY curTs = tsCols[pos];
@ -1118,6 +1128,8 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
assert(pDataBlock != NULL);
SQuery* pQuery = pRuntimeEnv->pQuery;
int32_t fillType = pQuery->fillType;
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, 0);
@ -1126,7 +1138,7 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
bool done = resultRowInterpolated(pResult, RESULT_ROW_START_INTERP);
if (!done) {
int32_t startRowIndex = startPos;
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win);
bool interp = setTimeWindowInterpolationStartTs(pRuntimeEnv, startRowIndex, pDataBlockInfo->rows, pDataBlock, tsCols, win, fillType);
if (interp) {
setResultRowInterpo(pResult, RESULT_ROW_START_INTERP);
}
@ -1134,6 +1146,12 @@ static void doWindowBorderInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SDataBloc
setNotInterpoWindowKey(pRuntimeEnv->pCtx, pQuery->numOfOutput, RESULT_ROW_START_INTERP);
}
// point interpolation does not require the end key time window interpolation.
if (isPointInterpoQuery(pQuery)) {
return;
}
// interpolation query does not generate the time window end interpolation
done = resultRowInterpolated(pResult, RESULT_ROW_END_INTERP);
if (!done) {
int32_t endRowIndex = startPos + (forwardStep - 1) * step;
@ -1259,7 +1277,7 @@ static void blockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
pCtx[k].nStartQueryTimestamp = pDataBlockInfo->window.skey;
pCtx[k].nStartQueryTimestamp = pQuery->window.skey;
aAggs[functionId].xFunction(&pCtx[k]);
}
}
@ -1423,18 +1441,20 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
return true;
}
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDataBlock, TSKEY prevTs, int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey, int32_t type) {
SQuery* pQuery = pRuntimeEnv->pQuery;
void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv *pRuntimeEnv, SArray *pDataBlock, TSKEY prevTs,
int32_t prevRowIndex, TSKEY curTs, int32_t curRowIndex, TSKEY windowKey,
int32_t type) {
SQuery *pQuery = pRuntimeEnv->pQuery;
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
int32_t functionId = pQuery->pExpr1[k].base.functionId;
if (functionId != TSDB_FUNC_TWA) {
if (functionId != TSDB_FUNC_TWA && functionId != TSDB_FUNC_INTERP) {
pRuntimeEnv->pCtx[k].start.key = INT64_MIN;
continue;
}
SColIndex* pColIndex = &pQuery->pExpr1[k].base.colInfo;
int16_t index = pColIndex->colIndex;
SColumnInfoData* pColInfo = taosArrayGet(pDataBlock, index);
SColIndex * pColIndex = &pQuery->pExpr1[k].base.colInfo;
int16_t index = pColIndex->colIndex;
SColumnInfoData *pColInfo = taosArrayGet(pDataBlock, index);
assert(pColInfo->info.colId == pColIndex->colId && curTs != windowKey);
double v1 = 0, v2 = 0, v = 0;
@ -1450,14 +1470,25 @@ void doRowwiseTimeWindowInterpolation(SQueryRuntimeEnv* pRuntimeEnv, SArray* pDa
SPoint point1 = (SPoint){.key = prevTs, .val = &v1};
SPoint point2 = (SPoint){.key = curTs, .val = &v2};
SPoint point = (SPoint){.key = windowKey, .val = &v};
taosGetLinearInterpolationVal(TSDB_DATA_TYPE_DOUBLE, &point1, &point2, &point);
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = point.key;
pRuntimeEnv->pCtx[k].start.val = v;
if (functionId == TSDB_FUNC_TWA) {
taosGetLinearInterpolationVal(&point, TSDB_DATA_TYPE_DOUBLE, &point1, &point2, TSDB_DATA_TYPE_DOUBLE);
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = point.key;
pRuntimeEnv->pCtx[k].start.val = v;
} else {
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
}
} else {
pRuntimeEnv->pCtx[k].end.key = point.key;
pRuntimeEnv->pCtx[k].end.val = v;
if (type == RESULT_ROW_START_INTERP) {
pRuntimeEnv->pCtx[k].start.key = prevTs;
pRuntimeEnv->pCtx[k].start.val = v1;
pRuntimeEnv->pCtx[k].end.key = curTs;
pRuntimeEnv->pCtx[k].end.val = v2;
}
}
}
}
@ -1796,13 +1827,7 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
pCtx->preAggVals.statis.max = pBlockInfo->window.ekey;
}
} else if (functionId == TSDB_FUNC_INTERP) {
SResultRowCellInfo* pInfo = GET_RES_INFO(pCtx);
SInterpInfoDetail *pInterpInfo = (SInterpInfoDetail *)GET_ROWCELL_INTERBUF(pInfo);
pInterpInfo->type = (int8_t)pQuery->fillType;
pInterpInfo->ts = pQuery->window.skey;
pInterpInfo->primaryCol = (colId == PRIMARYKEY_TIMESTAMP_COL_INDEX);
pCtx->param[2].i64 = (int8_t) pQuery->fillType;
if (pQuery->fillVal != NULL) {
if (isNull((const char*) &pQuery->fillVal[colIndex], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
@ -1908,6 +1933,10 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int16_t order
if (pIndex->colId == TSDB_TBNAME_COLUMN_INDEX) { // todo refactor
SSchema s = tGetTableNameColumnSchema();
pCtx->inputBytes = s.bytes;
pCtx->inputType = s.type;
} else if (pIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
pCtx->inputBytes = s.bytes;
pCtx->inputType = s.type;
} else {
@ -2579,7 +2608,6 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo * pW
if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTsBuf > 0) {
*status = BLK_DATA_ALL_NEEDED;
} else { // check if this data block is required to load
// Calculate all time windows that are overlapping or contain current data block.
// If current data block is contained by all possible time window, do not load current data block.
if (QUERY_IS_INTERVAL_QUERY(pQuery) && overlapWithTimeWindow(pQuery, pBlockInfo)) {
@ -2818,6 +2846,10 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
while (tsdbNextDataBlock(pQueryHandle)) {
summary->totalBlocks += 1;
if (IS_MASTER_SCAN(pRuntimeEnv)) {
pQuery->numOfCheckedBlocks += 1;
}
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
@ -3557,7 +3589,7 @@ void setQueryStatus(SQuery *pQuery, int8_t status) {
}
}
bool needScanDataBlocksAgain(SQueryRuntimeEnv *pRuntimeEnv) {
bool needRepeatScan(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
bool toContinue = false;
@ -3684,6 +3716,67 @@ static void restoreTimeWindow(STableGroupInfo* pTableGroupInfo, STsdbQueryCond*
pKeyInfo->lastKey = pCond->twindow.skey;
}
static void handleInterpolationQuery(SQInfo* pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
if (pQuery->numOfCheckedBlocks > 0 || !isPointInterpoQuery(pQuery)) {
return;
}
SArray *prev = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_PREV_ROW);
SArray *next = tsdbGetExternalRow(pRuntimeEnv->pQueryHandle, &pQInfo->memRef, TSDB_NEXT_ROW);
if (prev == NULL || next == NULL) {
return;
}
// setup the pCtx->start/end info and calculate the interpolation value
SColumnInfoData *startTs = taosArrayGet(prev, 0);
SColumnInfoData *endTs = taosArrayGet(next, 0);
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
SQLFunctionCtx *pCtx = &pRuntimeEnv->pCtx[i];
int32_t functionId = pQuery->pExpr1[i].base.functionId;
SColIndex *pColIndex = &pQuery->pExpr1[i].base.colInfo;
if (!TSDB_COL_IS_NORMAL_COL(pColIndex->flag)) {
aAggs[functionId].xFunction(pCtx);
continue;
}
SColumnInfoData *p = taosArrayGet(prev, pColIndex->colIndex);
SColumnInfoData *n = taosArrayGet(next, pColIndex->colIndex);
assert(p->info.colId == pColIndex->colId);
pCtx->start.key = *(TSKEY *)startTs->pData;
pCtx->end.key = *(TSKEY *)endTs->pData;
if (p->info.type != TSDB_DATA_TYPE_BINARY && p->info.type != TSDB_DATA_TYPE_NCHAR) {
GET_TYPED_DATA(pCtx->start.val, double, p->info.type, p->pData);
GET_TYPED_DATA(pCtx->end.val, double, n->info.type, n->pData);
} else { // string pointer
pCtx->start.ptr = p->pData;
pCtx->end.ptr = n->pData;
}
pCtx->param[2].i64 = (int8_t)pQuery->fillType;
pCtx->nStartQueryTimestamp = pQuery->window.skey;
if (pQuery->fillVal != NULL) {
if (isNull((const char *)&pQuery->fillVal[i], pCtx->inputType)) {
pCtx->param[1].nType = TSDB_DATA_TYPE_NULL;
} else { // todo refactor, tVariantCreateFromBinary should handle the NULL value
if (pCtx->inputType != TSDB_DATA_TYPE_BINARY && pCtx->inputType != TSDB_DATA_TYPE_NCHAR) {
tVariantCreateFromBinary(&pCtx->param[1], (char *)&pQuery->fillVal[i], pCtx->inputBytes, pCtx->inputType);
}
}
}
aAggs[functionId].xFunction(pCtx);
}
}
void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -3711,7 +3804,7 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
}
}
if (!needScanDataBlocksAgain(pRuntimeEnv)) {
if (!needRepeatScan(pRuntimeEnv)) {
// restore the status code and jump out of loop
if (pRuntimeEnv->scanFlag == REPEAT_SCAN) {
pQuery->status = qstatus.status;
@ -3737,24 +3830,19 @@ void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
qDebug("QInfo:%p start to repeat scan data blocks due to query func required, qrange:%"PRId64"-%"PRId64, pQInfo,
cond.twindow.skey, cond.twindow.ekey);
// check if query is killed or not
if (isQueryKilled(pQInfo)) {
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
}
if (!needReverseScan(pQuery)) {
return;
if (needReverseScan(pQuery)) {
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
// reverse scan from current position
qDebug("QInfo:%p start to reverse scan", pQInfo);
doScanAllDataBlocks(pRuntimeEnv);
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
}
setEnvBeforeReverseScan(pRuntimeEnv, &qstatus);
// reverse scan from current position
qDebug("QInfo:%p start to reverse scan", pQInfo);
doScanAllDataBlocks(pRuntimeEnv);
clearEnvAfterReverseScan(pRuntimeEnv, &qstatus);
handleInterpolationQuery(pQInfo);
}
void finalizeQueryResult(SQueryRuntimeEnv *pRuntimeEnv) {
@ -4377,7 +4465,59 @@ static void updateOffsetVal(SQueryRuntimeEnv *pRuntimeEnv, SDataBlockInfo *pBloc
qDebug("QInfo:%p check data block, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, numOfRes:%d, lastKey:%"PRId64, GET_QINFO_ADDR(pRuntimeEnv),
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
}
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist != NULL) {
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
free(pTableBlockDist->result);
free(pTableBlockDist);
}
}
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
int32_t len = (int32_t)taosArrayGetSize(pArray);
if (len <= 0) {
return 0;
}
assert(rate >= 0 && rate <= 1.0);
int idx = (int32_t)((len - 1) * rate);
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
}
static int compareBlockInfo(const void *pLeft, const void *pRight) {
int32_t left = ((SDataBlockInfo *)pLeft)->rows;
int32_t right = ((SDataBlockInfo *)pRight)->rows;
if (left > right) return 1;
if (left < right) return -1;
return 0;
}
static void generateBlockDistResult(STableBlockDist *pTableBlockDist) {
if (pTableBlockDist == NULL) {
return;
}
int64_t min = INT64_MAX, max = INT64_MIN, avg = 0;
SArray* blockInfos= pTableBlockDist->dataBlockInfos;
int64_t totalRows = 0, totalBlocks = taosArrayGetSize(blockInfos);
for (size_t i = 0; i < taosArrayGetSize(blockInfos); i++) {
SDataBlockInfo *blockInfo = taosArrayGet(blockInfos, i);
int64_t rows = blockInfo->rows;
min = MIN(min, rows);
max = MAX(max, rows);
totalRows += rows;
}
avg = totalBlocks > 0 ? (int64_t)(totalRows/totalBlocks) : 0;
taosArraySort(blockInfos, compareBlockInfo);
int sz = sprintf(pTableBlockDist->result,
"summery: \n\t 5th=[%d], 25th=[%d], 50th=[%d],75th=[%d], 95th=[%d], 99th=[%d] \n\t min=[%"PRId64"], max=[%"PRId64"], avg = [%"PRId64"] \n\t totalRows=[%"PRId64"], totalBlocks=[%"PRId64"] \n\t seekHeaderTimeCost=[%"PRId64"(us)] \n\t rowsInMem=[%"PRId64"]",
getPercentileFromSortedArray(blockInfos, 0.05), getPercentileFromSortedArray(blockInfos, 0.25), getPercentileFromSortedArray(blockInfos, 0.50),
getPercentileFromSortedArray(blockInfos, 0.75), getPercentileFromSortedArray(blockInfos, 0.95), getPercentileFromSortedArray(blockInfos, 0.99),
min, max, avg,
totalRows, totalBlocks,
pTableBlockDist->firstSeekTimeUs,
pTableBlockDist->numOfRowsInMemTable);
UNUSED(sz);
return;
}
void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
SQuery *pQuery = pRuntimeEnv->pQuery;
@ -4891,6 +5031,7 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
.order = pQuery->order.order,
.colList = pQuery->colList,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
// todo refactor
@ -4985,6 +5126,7 @@ STsdbQueryCond createTsdbQueryCond(SQuery* pQuery, STimeWindow* win) {
.colList = pQuery->colList,
.order = pQuery->order.order,
.numOfCols = pQuery->numOfCols,
.loadExternalRows = false,
};
TIME_WINDOW_COPY(cond.twindow, *win);
@ -5728,15 +5870,13 @@ static void tableIntervalProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo) {
}
scanOneTableDataBlocks(pRuntimeEnv, newStartKey);
assert(!Q_STATUS_EQUAL(pQuery->status, QUERY_NOT_COMPLETED));
finalizeQueryResult(pRuntimeEnv);
// skip offset result rows
pQuery->rec.rows = 0;
// not fill or no result generated during this query
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->windowResInfo.size == 0) {
if (pQuery->fillType == TSDB_FILL_NONE || pRuntimeEnv->windowResInfo.size == 0 || isPointInterpoQuery(pQuery)) {
// all data scanned, the group by normal column can return
int32_t numOfClosed = numOfClosedResultRows(&pRuntimeEnv->windowResInfo);
if (pQuery->limit.offset > numOfClosed) {
@ -5771,7 +5911,7 @@ static void tableQueryImpl(SQInfo *pQInfo) {
SQuery * pQuery = pRuntimeEnv->pQuery;
if (hasNotReturnedResults(pRuntimeEnv)) {
if (pQuery->fillType != TSDB_FILL_NONE) {
if (pQuery->fillType != TSDB_FILL_NONE && !isPointInterpoQuery(pQuery)) {
/*
* There are remain results that are not returned due to result interpolation
* So, we do keep in this procedure instead of launching retrieve procedure for next results.
@ -5830,6 +5970,58 @@ static void tableQueryImpl(SQInfo *pQInfo) {
pRuntimeEnv->summary.elapsedTime += (taosGetTimestampUs() - st);
assert(pQInfo->tableqinfoGroupInfo.numOfTables == 1);
}
static void buildTableBlockDistResult(SQInfo *pQInfo) {
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
SQuery *pQuery = pRuntimeEnv->pQuery;
pQuery->pos = 0;
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SDataBlockInfo));
pTableBlockDist->result = (char *)malloc(512);
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
SDataBlockInfo blockInfo = SDATA_BLOCK_INITIALIZER;
SSchema blockDistSchema = tGetBlockDistColumnSchema();
int64_t startTime = taosGetTimestampUs();
while (tsdbNextDataBlockWithoutMerge(pQueryHandle)) {
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
if (pTableBlockDist->firstSeekTimeUs == 0) {
pTableBlockDist->firstSeekTimeUs = taosGetTimestampUs() - startTime;
}
tsdbRetrieveDataBlockInfo(pQueryHandle, &blockInfo);
taosArrayPush(pTableBlockDist->dataBlockInfos, &blockInfo);
}
if (terrno != TSDB_CODE_SUCCESS) {
freeTableBlockDist(pTableBlockDist);
longjmp(pRuntimeEnv->env, terrno);
}
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
generateBlockDistResult(pTableBlockDist);
int type = -1;
assert(pQuery->numOfOutput == 1);
SExprInfo* pExprInfo = pQuery->pExpr1;
for (int32_t j = 0; j < pQuery->numOfOutput; j++) {
if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
type = blockDistSchema.type;
}
assert(type == TSDB_DATA_TYPE_BINARY);
STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
}
freeTableBlockDist(pTableBlockDist);
pQuery->rec.rows = 1;
setQueryStatus(pQuery, QUERY_COMPLETED);
return;
}
static void stableQueryImpl(SQInfo *pQInfo) {
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
@ -5857,7 +6049,10 @@ static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pE
if (TSDB_COL_IS_TAG(pExprMsg->colInfo.flag)) {
if (pExprMsg->colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
return TSDB_TBNAME_COLUMN_INDEX;
} else if (pExprMsg->colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
return TSDB_BLOCK_DIST_COLUMN_INDEX;
}
while(j < pQueryMsg->numOfTags) {
if (pExprMsg->colInfo.colId == pTagCols[j].colId) {
@ -6314,6 +6509,10 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
SSchema s = tGetTableNameColumnSchema();
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
SSchema s = tGetBlockDistColumnSchema();
type = s.type;
bytes = s.bytes;
} else if (pExprs[i].base.colInfo.colId <= TSDB_UD_COLUMN_INDEX) {
// it is a user-defined constant value column
assert(pExprs[i].base.functionId == TSDB_FUNC_PRJ);
@ -6327,7 +6526,7 @@ static int32_t createQueryFuncExprFromMsg(SQueryTableMsg *pQueryMsg, int32_t num
} else {
int32_t j = getColumnIndexInSource(pQueryMsg, &pExprs[i].base, pTagCols);
if (TSDB_COL_IS_TAG(pExprs[i].base.colInfo.flag)) {
if (j < TSDB_TBNAME_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
if (j < TSDB_BLOCK_DIST_COLUMN_INDEX || j >= pQueryMsg->numOfTags) {
return TSDB_CODE_QRY_INVALID_MSG;
}
} else {
@ -6497,7 +6696,7 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
}
}
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX);
assert(f < pQuery->numOfTags || pColIndex->colId == TSDB_TBNAME_COLUMN_INDEX || pColIndex->colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
}
}
}
@ -6700,9 +6899,10 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
index += 1;
}
}
colIdCheck(pQuery);
pQInfo->runtimeEnv.queryBlockDist = (numOfOutput == 1 && pExprs[0].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX);
qDebug("qmsg:%p QInfo:%p created", pQueryMsg, pQInfo);
return pQInfo;
@ -6719,7 +6919,8 @@ _cleanup_query:
for (int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExprInfo = &pExprs[i];
if (pExprInfo->pExpr != NULL) {
tExprTreeDestroy(&pExprInfo->pExpr, NULL);
tExprTreeDestroy(pExprInfo->pExpr, NULL);
pExprInfo->pExpr = NULL;
}
}
@ -6835,7 +7036,7 @@ static void* destroyQueryFuncExpr(SExprInfo* pExprInfo, int32_t numOfExpr) {
for (int32_t i = 0; i < numOfExpr; ++i) {
if (pExprInfo[i].pExpr != NULL) {
tExprNodeDestroy(pExprInfo[i].pExpr, NULL);
tExprTreeDestroy(pExprInfo[i].pExpr, NULL);
}
}
@ -7212,6 +7413,8 @@ bool qTableQuery(qinfo_t qinfo) {
buildTagQueryResult(pQInfo);
} else if (pQInfo->runtimeEnv.stableQuery) {
stableQueryImpl(pQInfo);
} else if (pQInfo->runtimeEnv.queryBlockDist){
buildTableBlockDistResult(pQInfo);
} else {
tableQueryImpl(pQInfo);
}

View File

@ -12,13 +12,13 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "qExtbuffer.h"
#include "os.h"
#include "qAggMain.h"
#include "queryLog.h"
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#include "tulog.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \

View File

@ -15,9 +15,9 @@
#include "os.h"
#include "qAggMain.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tsqlfunction.h"
#include "ttype.h"
#include "qFill.h"
@ -120,7 +120,7 @@ static void doFillOneRowResult(SFillInfo* pFillInfo, tFilePage** data, char** sr
point1 = (SPoint){.key = *(TSKEY*)(prev), .val = prev + pCol->col.offset};
point2 = (SPoint){.key = ts, .val = srcData[i] + pFillInfo->index * bytes};
point = (SPoint){.key = pFillInfo->currentKey, .val = val1};
taosGetLinearInterpolationVal(type, &point1, &point2, &point);
taosGetLinearInterpolationVal(&point, type, &point1, &point2, type);
}
} else {
setNullValueForRow(pFillInfo, data, pFillInfo->numOfCols, index);
@ -479,25 +479,13 @@ int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t ma
return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
}
int32_t taosGetLinearInterpolationVal(int32_t type, SPoint* point1, SPoint* point2, SPoint* point) {
double v1 = -1;
double v2 = -1;
GET_TYPED_DATA(v1, double, type, point1->val);
GET_TYPED_DATA(v2, double, type, point2->val);
int32_t taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType) {
double v1 = -1, v2 = -1;
GET_TYPED_DATA(v1, double, inputType, point1->val);
GET_TYPED_DATA(v2, double, inputType, point2->val);
double r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
switch(type) {
case TSDB_DATA_TYPE_TINYINT: *(int8_t*) point->val = (int8_t) r;break;
case TSDB_DATA_TYPE_SMALLINT: *(int16_t*) point->val = (int16_t) r;break;
case TSDB_DATA_TYPE_INT: *(int32_t*) point->val = (int32_t) r;break;
case TSDB_DATA_TYPE_BIGINT: *(int64_t*) point->val = (int64_t) r;break;
case TSDB_DATA_TYPE_DOUBLE: *(double*) point->val = (double) r;break;
case TSDB_DATA_TYPE_FLOAT: *(float*) point->val = (float) r;break;
default:
assert(0);
}
SET_TYPED_DATA(point->val, outputType, r);
return TSDB_CODE_SUCCESS;
}

View File

@ -71,7 +71,7 @@ abort_parse:
return sqlInfo;
}
tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken *pToken) {
tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken *pDistinct, SStrToken *pToken) {
if (pList == NULL) {
pList = calloc(1, sizeof(tSQLExprList));
}
@ -97,6 +97,7 @@ tSQLExprList *tSqlExprListAppend(tSQLExprList *pList, tSQLExpr *pNode, SStrToken
strdequote(pItem->aliasName);
}
pItem->distinct = (pDistinct != NULL);
}
return pList;
}

View File

@ -115,6 +115,11 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
GET_TYPED_DATA(v, int64_t, pBucket->type, value);
int32_t index = -1;
if (v > pBucket->range.i64MaxVal || v < pBucket->range.i64MinVal) {
return index;
}
// divide the value range into 1024 buckets
uint64_t span = pBucket->range.i64MaxVal - pBucket->range.i64MinVal;
if (span < pBucket->numOfSlots) {
@ -128,7 +133,7 @@ int32_t tBucketIntHash(tMemBucket *pBucket, const void *value) {
}
}
assert(v >= pBucket->range.i64MinVal && v <= pBucket->range.i64MaxVal && index >= 0 && index < pBucket->numOfSlots);
assert(index >= 0 && index < pBucket->numOfSlots);
return index;
}
@ -137,6 +142,11 @@ int32_t tBucketUintHash(tMemBucket *pBucket, const void *value) {
GET_TYPED_DATA(v, uint64_t, pBucket->type, value);
int32_t index = -1;
if (v > pBucket->range.u64MaxVal || v < pBucket->range.u64MinVal) {
return index;
}
// divide the value range into 1024 buckets
uint64_t span = pBucket->range.u64MaxVal - pBucket->range.u64MinVal;
if (span < pBucket->numOfSlots) {
@ -150,7 +160,7 @@ int32_t tBucketUintHash(tMemBucket *pBucket, const void *value) {
}
}
assert(v >= pBucket->range.u64MinVal && v <= pBucket->range.i64MaxVal && index >= 0 && index < pBucket->numOfSlots);
assert(index >= 0 && index < pBucket->numOfSlots);
return index;
}
@ -164,6 +174,10 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
int32_t index = -1;
if (v > pBucket->range.dMaxVal || v < pBucket->range.dMinVal) {
return index;
}
// divide a range of [dMinVal, dMaxVal] into 1024 buckets
double span = pBucket->range.dMaxVal - pBucket->range.dMinVal;
if (span < pBucket->numOfSlots) {
@ -177,7 +191,7 @@ int32_t tBucketDoubleHash(tMemBucket *pBucket, const void *value) {
}
}
assert(v >= pBucket->range.dMinVal && v <= pBucket->range.dMaxVal && index >= 0 && index < pBucket->numOfSlots);
assert(index >= 0 && index < pBucket->numOfSlots);
return index;
}
@ -309,9 +323,13 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
int32_t bytes = pBucket->bytes;
for (int32_t i = 0; i < size; ++i) {
char *d = (char *) data + i * bytes;
count += 1;
int32_t index = (pBucket->hashFunc)(pBucket, d);
if (index < 0) {
continue;
}
count += 1;
tMemBucketSlot *pSlot = &pBucket->pSlots[index];
tMemBucketUpdateBoundingBox(&pSlot->range, d, pBucket->type);

View File

@ -240,6 +240,7 @@ static SKeyword keywordTable[] = {
{"AVG_RATE", TK_AVG_RATE},
{"AVG_IRATE", TK_AVG_IRATE},
{"CACHELAST", TK_CACHELAST},
{"DISTINCT", TK_DISTINCT},
};
static const char isIdChar[] = {

File diff suppressed because it is too large Load Diff

View File

@ -3,7 +3,7 @@
#include <cassert>
#include <iostream>
#include "qAst.h"
#include "texpr.h"
#include "taosmsg.h"
#include "tsdb.h"
#include "tskiplist.h"

View File

@ -3,7 +3,7 @@
#include <cassert>
#include <iostream>
#include "tsqlfunction.h"
#include "qAggMain.h"
#include "tcompare.h"
TEST(testCase, patternMatchTest) {

View File

@ -19,10 +19,9 @@
#include "tcompare.h"
#include "exception.h"
#include "../../query/inc/qAst.h" // todo move to common module
#include "tlosertree.h"
#include "tsdb.h"
#include "tsdbint.h"
#include "texpr.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
@ -111,6 +110,7 @@ typedef struct STsdbQueryHandle {
int32_t activeIndex;
bool checkFiles; // check file stage
bool cachelastrow; // check if last row cached
bool loadExternalRow; // load time window external data rows
void* qinfo; // query info handle, for debug purpose
int32_t type; // query type: retrieve all data blocks, 2. retrieve only last row, 3. retrieve direct prev|next rows
SDFileSet* pFileGroup;
@ -125,6 +125,8 @@ typedef struct STsdbQueryHandle {
SDataBlockLoadInfo dataBlockLoadInfo; /* record current block load information */
SLoadCompBlockInfo compBlockLoadInfo; /* record current compblock information in SQuery */
SArray *prev; // previous row which is before than time window
SArray *next; // next row which is after the query time window
SIOCostSummary cost;
} STsdbQueryHandle;
@ -141,10 +143,10 @@ static int32_t tsdbGetCachedLastRow(STable* pTable, SDataRow* pRes, TSKEY* lastK
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
STsdbQueryHandle* pQueryHandle);
static int tsdbCheckInfoCompar(const void* key1, const void* key2);
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle);
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SMemRef* pMemRef);
static void* doFreeColumnInfoData(SArray* pColumnInfoData);
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
pBlockLoadInfo->slot = -1;
@ -210,6 +212,36 @@ static void tsdbMayUnTakeMemSnapshot(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->pMemRef = NULL;
}
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
size_t size = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(pQueryHandle->activeIndex < size && pQueryHandle->activeIndex >= 0 && size >= 1);
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int64_t rows = 0;
SMemRef* pMemRef = pQueryHandle->pMemRef;
if (pMemRef == NULL) { return rows; }
STableData* pMem = NULL;
STableData* pIMem = NULL;
SMemTable *pMemT = (SMemTable *)(pMemRef->mem);
SMemTable *pIMemT = (SMemTable *)(pMemRef->imem);
if (pMemT && pCheckInfo->tableId.tid < pMemT->maxTables) {
pMem = pMemT->tData[pCheckInfo->tableId.tid];
rows += (pMem && pMem->uid == pCheckInfo->tableId.uid) ? pMem->numOfRows: 0;
}
if (pIMemT && pCheckInfo->tableId.tid < pIMemT->maxTables) {
pIMem = pIMemT->tData[pCheckInfo->tableId.tid];
rows += (pIMem && pIMem->uid == pCheckInfo->tableId.uid) ? pIMem->numOfRows: 0;
}
return rows;
}
static SArray* createCheckInfoFromTableGroup(STsdbQueryHandle* pQueryHandle, STableGroupInfo* pGroupList, STsdbMeta* pMeta) {
size_t sizeOfGroup = taosArrayGetSize(pGroupList->pGroupList);
assert(sizeOfGroup >= 1 && pMeta != NULL);
@ -294,6 +326,7 @@ static STsdbQueryHandle* tsdbQueryTablesImpl(STsdbRepo* tsdb, STsdbQueryCond* pC
pQueryHandle->allocSize = 0;
pQueryHandle->locateStart = false;
pQueryHandle->pMemRef = pMemRef;
pQueryHandle->loadExternalRow = pCond->loadExternalRows;
if (tsdbInitReadH(&pQueryHandle->rhelper, (STsdbRepo*)tsdb) != 0) {
goto out_of_memory;
@ -410,10 +443,11 @@ SArray* tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle) {
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond* pCond, STableGroupInfo *groupList, void* qinfo, SMemRef* pRef) {
STsdbQueryHandle *pQueryHandle = (STsdbQueryHandle*) tsdbQueryTables(tsdb, pCond, groupList, qinfo, pRef);
pQueryHandle->loadExternalRow = true;
if (pQueryHandle != NULL) {
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
changeQueryHandleForInterpQuery(pQueryHandle);
}
return pQueryHandle;
}
@ -1900,17 +1934,21 @@ static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
pQueryHandle->activeIndex += 1;
}
if (pQueryHandle->loadExternalRow && pQueryHandle->window.skey == pQueryHandle->window.ekey) {
SMemRef* pMemRef = pQueryHandle->pMemRef;
doGetExternalRow(pQueryHandle, TSDB_PREV_ROW, pMemRef);
doGetExternalRow(pQueryHandle, TSDB_NEXT_ROW, pMemRef);
}
// no data in memtable or imemtable, decrease the memory reference.
tsdbMayUnTakeMemSnapshot(pQueryHandle);
return false;
}
//todo not unref yet, since it is not support multi-group interpolation query
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
// filter the queried time stamp in the first place
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
pQueryHandle->order = TSDB_ORDER_DESC;
assert(pQueryHandle->window.skey == pQueryHandle->window.ekey);
// starts from the buffer in case of descending timestamp order check data blocks
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
@ -1920,8 +1958,8 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
// the first qualified table for interpolation query
if (pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey &&
pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL) {
if ((pQueryHandle->window.skey <= pCheckInfo->pTableObj->lastKey) &&
(pCheckInfo->pTableObj->lastKey != TSKEY_INITIAL_VAL)) {
break;
}
@ -1938,9 +1976,6 @@ static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle) {
info.lastKey = pQueryHandle->window.skey;
taosArrayPush(pQueryHandle->pTableCheckInfo, &info);
// update the query time window according to the chosen last timestamp
pQueryHandle->window = (STimeWindow) {info.lastKey, TSKEY_INITIAL_VAL};
}
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
@ -2020,7 +2055,6 @@ static void destroyHelper(void* param) {
return;
}
tQueryInfo* pInfo = (tQueryInfo*)param;
if (pInfo->optr != TSDB_RELATION_IN) {
tfree(pInfo->q);
@ -2029,111 +2063,6 @@ static void destroyHelper(void* param) {
free(param);
}
static bool getNeighborRows(STsdbQueryHandle* pQueryHandle) {
assert (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL);
SDataBlockInfo blockInfo = {{0}, 0};
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
pQueryHandle->order = TSDB_ORDER_DESC;
if (!tsdbNextDataBlock((void*) pQueryHandle)) {
return false;
}
tsdbRetrieveDataBlockInfo((void*) pQueryHandle, &blockInfo);
/*SArray *pDataBlock = */tsdbRetrieveDataBlock((void*) pQueryHandle, pQueryHandle->defaultLoadColumn);
if (terrno != TSDB_CODE_SUCCESS) {
return false;
}
if (pQueryHandle->cur.win.ekey == pQueryHandle->window.skey) {
// data already retrieve, discard other data rows and return
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
}
pQueryHandle->cur.win = (STimeWindow){pQueryHandle->window.skey, pQueryHandle->window.skey};
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.rows = 1;
pQueryHandle->type = TSDB_QUERY_TYPE_ALL;
return true;
} else {
STimeWindow win = (STimeWindow) {pQueryHandle->window.skey, INT64_MAX};
STsdbQueryCond cond = {
.order = TSDB_ORDER_ASC,
.numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle))
};
cond.twindow = win;
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return false;
}
for(int32_t i = 0; i < cond.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
}
STsdbQueryHandle* pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pQueryHandle->pMemRef);
tfree(cond.colList);
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
if (pSecQueryHandle->pTableCheckInfo == NULL) {
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
if (!tsdbNextDataBlock((void*) pSecQueryHandle)) {
tsdbCleanupQueryHandle(pSecQueryHandle);
return false;
}
tsdbRetrieveDataBlockInfo((void*) pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*) pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pSecQueryHandle));
size_t si = taosArrayGetSize(pSecQueryHandle->pTableCheckInfo);
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)pCol->pData + pCol->info.bytes * (pQueryHandle->cur.rows - 1), pCol->info.bytes);
SColumnInfoData* pCol1 = taosArrayGet(pSecQueryHandle->pColumns, i);
assert(pCol->info.colId == pCol1->info.colId);
memcpy((char*)pCol->pData + pCol->info.bytes, pCol1->pData, pCol1->info.bytes);
}
SColumnInfoData* pTSCol = taosArrayGet(pQueryHandle->pColumns, 0);
// it is ascending order
pQueryHandle->order = TSDB_ORDER_DESC;
pQueryHandle->window = pQueryHandle->cur.win;
pQueryHandle->cur.win = (STimeWindow){((TSKEY*)pTSCol->pData)[0], ((TSKEY*)pTSCol->pData)[1]};
pQueryHandle->cur.rows = 2;
pQueryHandle->cur.mixBlock = true;
int32_t step = -1;// one step for ascending order traverse
for (int32_t j = 0; j < si; ++j) {
STableCheckInfo* pCheckInfo = (STableCheckInfo*) taosArrayGet(pQueryHandle->pTableCheckInfo, j);
pCheckInfo->lastKey = pQueryHandle->cur.win.ekey + step;
}
tsdbCleanupQueryHandle(pSecQueryHandle);
}
//disable it after retrieve data
pQueryHandle->type = TSDB_QUERY_TYPE_EXTERNAL;
pQueryHandle->checkFiles = false;
return true;
}
// handle data in cache situation
bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
@ -2144,16 +2073,7 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (pQueryHandle->type == TSDB_QUERY_TYPE_EXTERNAL) {
SMemRef* pMemRef = pQueryHandle->pMemRef;
tsdbMayTakeMemSnapshot(pQueryHandle);
bool ret = getNeighborRows(pQueryHandle);
tsdbMayUnTakeMemSnapshot(pQueryHandle);
// restore the pMemRef
pQueryHandle->pMemRef = pMemRef;
return ret;
} else if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
@ -2218,6 +2138,185 @@ bool tsdbNextDataBlock(TsdbQueryHandleT* pHandle) {
return ret;
}
bool tsdbNextDataBlockWithoutMerge(TsdbQueryHandleT* pHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
int64_t stime = taosGetTimestampUs();
int64_t elapsedTime = stime;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
assert(numOfTables > 0);
if (pQueryHandle->type == TSDB_QUERY_TYPE_LAST && pQueryHandle->cachelastrow) {
// the last row is cached in buffer, return it directly.
// here note that the pQueryHandle->window must be the TS_INITIALIZER
int32_t numOfCols = (int32_t)(QH_GET_NUM_OF_COLS(pQueryHandle));
SQueryFilePos* cur = &pQueryHandle->cur;
SDataRow pRow = NULL;
TSKEY key = TSKEY_INITIAL_VAL;
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
if (++pQueryHandle->activeIndex < numOfTables) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, pQueryHandle->activeIndex);
int32_t ret = tsdbGetCachedLastRow(pCheckInfo->pTableObj, &pRow, &key);
if (ret != TSDB_CODE_SUCCESS) {
return false;
}
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, 0, pRow, numOfCols, pCheckInfo->pTableObj);
tfree(pRow);
// update the last key value
pCheckInfo->lastKey = key + step;
cur->rows = 1; // only one row
cur->lastKey = key + step;
cur->mixBlock = true;
cur->win.skey = key;
cur->win.ekey = key;
return true;
}
return false;
}
if (pQueryHandle->checkFiles) {
// check if the query range overlaps with the file data block
bool exists = true;
int32_t code = getDataBlocksInFiles(pQueryHandle, &exists);
if (code != TSDB_CODE_SUCCESS) {
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
return false;
}
if (exists) {
pQueryHandle->cost.checkForNextTime += (taosGetTimestampUs() - stime);
return exists;
}
pQueryHandle->activeIndex = 0;
pQueryHandle->checkFiles = false;
}
elapsedTime = taosGetTimestampUs() - stime;
pQueryHandle->cost.checkForNextTime += elapsedTime;
return false;
}
static int32_t doGetExternalRow(STsdbQueryHandle* pQueryHandle, int16_t type, SMemRef* pMemRef) {
STsdbQueryHandle* pSecQueryHandle = NULL;
if (type == TSDB_PREV_ROW && pQueryHandle->prev) {
return TSDB_CODE_SUCCESS;
}
if (type == TSDB_NEXT_ROW && pQueryHandle->next) {
return TSDB_CODE_SUCCESS;
}
// prepare the structure
int32_t numOfCols = (int32_t) QH_GET_NUM_OF_COLS(pQueryHandle);
if (type == TSDB_PREV_ROW) {
pQueryHandle->prev = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
if (pQueryHandle->prev == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto out_of_memory;
}
} else {
pQueryHandle->next = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
if (pQueryHandle->next == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto out_of_memory;
}
}
SArray* row = (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, i);
SColumnInfoData colInfo = {{0}, 0};
colInfo.info = pCol->info;
colInfo.pData = calloc(1, pCol->info.bytes);
if (colInfo.pData == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto out_of_memory;
}
taosArrayPush(row, &colInfo);
}
// load the previous row
STsdbQueryCond cond = {.numOfCols = numOfCols, .loadExternalRows = false,};
if (type == TSDB_PREV_ROW) {
cond.order = TSDB_ORDER_DESC;
cond.twindow = (STimeWindow){pQueryHandle->window.skey, INT64_MIN};
} else {
cond.order = TSDB_ORDER_ASC;
cond.twindow = (STimeWindow){pQueryHandle->window.skey, INT64_MAX};
}
cond.colList = calloc(cond.numOfCols, sizeof(SColumnInfo));
if (cond.colList == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto out_of_memory;
}
for (int32_t i = 0; i < cond.numOfCols; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pQueryHandle->pColumns, i);
memcpy(&cond.colList[i], &pColInfoData->info, sizeof(SColumnInfo));
}
pSecQueryHandle = tsdbQueryTablesImpl(pQueryHandle->pTsdb, &cond, pQueryHandle->qinfo, pMemRef);
tfree(cond.colList);
pSecQueryHandle->pTableCheckInfo = createCheckInfoFromCheckInfo(pQueryHandle->pTableCheckInfo, pSecQueryHandle->window.skey);
if (pSecQueryHandle->pTableCheckInfo == NULL) {
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
goto out_of_memory;
}
if (!tsdbNextDataBlock((void*)pSecQueryHandle)) {
// no result in current query, free the corresponding result rows structure
if (type == TSDB_PREV_ROW) {
pQueryHandle->prev = doFreeColumnInfoData(pQueryHandle->prev);
} else {
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
}
goto out_of_memory;
}
SDataBlockInfo blockInfo = {{0}, 0};
tsdbRetrieveDataBlockInfo((void*)pSecQueryHandle, &blockInfo);
tsdbRetrieveDataBlock((void*)pSecQueryHandle, pSecQueryHandle->defaultLoadColumn);
row = (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next;
int32_t pos = (type == TSDB_PREV_ROW)?pSecQueryHandle->cur.rows - 1:0;
for (int32_t i = 0; i < numOfCols; ++i) {
SColumnInfoData* pCol = taosArrayGet(row, i);
SColumnInfoData* s = taosArrayGet(pSecQueryHandle->pColumns, i);
memcpy((char*)pCol->pData, (char*)s->pData + s->info.bytes * pos, pCol->info.bytes);
}
out_of_memory:
tsdbCleanupQueryHandle(pSecQueryHandle);
return terrno;
}
SArray* tsdbGetExternalRow(TsdbQueryHandleT *pHandle, SMemRef* pMemRef, int16_t type) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*) pHandle;
assert(type == TSDB_PREV_ROW || type == TSDB_NEXT_ROW);
return (type == TSDB_PREV_ROW)? pQueryHandle->prev:pQueryHandle->next;
}
/*
* 1. no data at all (pTable->lastKey = TSKEY_INITIAL_VAL), just return TSKEY_INITIAL_VAL
* 2. has data but not loaded, just return lastKey but not set pRes
@ -2716,7 +2815,7 @@ static int32_t doQueryTableList(STable* pSTable, SArray* pRes, tExprNode* pExpr)
};
getTableListfromSkipList(pExpr, pSTable->pIndex, pRes, &supp);
tExprTreeDestroy(&pExpr, destroyHelper);
tExprTreeDestroy(pExpr, destroyHelper);
return TSDB_CODE_SUCCESS;
}
@ -2773,10 +2872,10 @@ int32_t tsdbQuerySTableByTagCond(STsdbRepo* tsdb, uint64_t uid, TSKEY skey, cons
if (expr == NULL) {
expr = exprTreeFromBinary(pTagCond, len);
} else {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, expr, NULL);
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, expr, NULL);
tExprNode* tagExpr = exprTreeFromBinary(pTagCond, len);
if (tagExpr != NULL) {
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprNodeDestroy, tagExpr, NULL);
CLEANUP_PUSH_VOID_PTR_PTR(true, tExprTreeDestroy, tagExpr, NULL);
tExprNode* tbnameExpr = expr;
expr = calloc(1, sizeof(tExprNode));
if (expr == NULL) {
@ -2890,6 +2989,21 @@ int32_t tsdbGetTableGroupFromIdList(STsdbRepo* tsdb, SArray* pTableIdList, STabl
return TSDB_CODE_SUCCESS;
}
static void* doFreeColumnInfoData(SArray* pColumnInfoData) {
if (pColumnInfoData == NULL) {
return NULL;
}
size_t cols = taosArrayGetSize(pColumnInfoData);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pColumnInfoData, i);
tfree(pColInfo->pData);
}
taosArrayDestroy(pColumnInfoData);
return NULL;
}
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
if (pQueryHandle == NULL) {
@ -2907,14 +3021,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
}
if (pQueryHandle->pColumns != NULL) {
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
taosArrayDestroy(pQueryHandle->pColumns);
}
pQueryHandle->pColumns = doFreeColumnInfoData(pQueryHandle->pColumns);
taosArrayDestroy(pQueryHandle->defaultLoadColumn);
tfree(pQueryHandle->pDataBlockInfo);
@ -2928,6 +3035,9 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
tdFreeDataCols(pQueryHandle->pDataCols);
pQueryHandle->pDataCols = NULL;
pQueryHandle->prev = doFreeColumnInfoData(pQueryHandle->prev);
pQueryHandle->next = doFreeColumnInfoData(pQueryHandle->next);
SIOCostSummary* pCost = &pQueryHandle->cost;
tsdbDebug("%p :io-cost summary: statis-info:%"PRId64" us, datablock:%" PRId64" us, check data:%"PRId64" us, %p",
pQueryHandle, pCost->statisInfoLoadTime, pCost->blockLoadTime, pCost->checkForNextTime, pQueryHandle->qinfo);

View File

@ -27,6 +27,9 @@ extern "C" {
#define TSQL_TBNAME "TBNAME"
#define TSQL_TBNAME_L "tbname"
#define TSQL_BLOCK_DIST "_BLOCK_DIST"
#define TSQL_BLOCK_DIST_L "_block_dist"
// used to denote the minimum unite in sql parsing
typedef struct SStrToken {
uint32_t n;

View File

@ -274,7 +274,7 @@ void *taosIterateRef(int rsetId, int64_t rid) {
return NULL;
}
if (rid <= 0) {
if (rid < 0) {
uTrace("rsetId:%d rid:%" PRId64 " failed to iterate, rid not valid", rsetId, rid);
terrno = TSDB_CODE_REF_NOT_EXIST;
return NULL;

View File

@ -638,209 +638,293 @@ if $data24 != NULL then
return -1
endi
## interp(*) from stb + group by + fill(prev)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(prev) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00000 then
return -1
endi
if $data04 != 0.000000000 then
return -1
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != binary0 then
return -1
endi
if $data09 != nchar0 then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00000 then
return -1
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
return -1
endi
if $data27 != 1 then
return -1
endi
if $data28 != binary0 then
return -1
endi
if $data29 != nchar0 then
return -1
endi
## interp(*) from stb + group by + fill(prev)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(prev) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00000 then
return -1
endi
if $data04 != 0.000000000 then
return -1
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != binary0 then
return -1
endi
if $data09 != nchar0 then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00000 then
return -1
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
return -1
endi
if $data27 != 1 then
return -1
endi
if $data28 != binary0 then
return -1
endi
if $data29 != nchar0 then
return -1
endi
## interp(*) from stb + group by + fill(linear)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(linear) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00167 then
return -1
endi
if $data04 != 0.001666667 then
return -1
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
return -1
endi
if $data07 != NULL then
return -1
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00167 then
return -1
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
return -1
endi
if $data27 != NULL then
return -1
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
return -1
endi
## interp(*) from stb + group by + fill(linear)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(linear) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data03 != 0.00167 then
return -1
endi
if $data04 != 0.001666667 then
return -1
endi
if $data05 != 0 then
return -1
endi
if $data06 != 0 then
return -1
endi
if $data07 != NULL then
return -1
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != 0 then
return -1
endi
if $data22 != NULL then
return -1
endi
if $data23 != 0.00167 then
return -1
endi
if $data24 != NULL then
return -1
endi
if $data25 != 0 then
return -1
endi
if $data26 != 0 then
return -1
endi
if $data27 != NULL then
return -1
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
return -1
endi
## interp(*) from stb + group by + fill(value)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(value, -1, -2) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != -2 then
return -1
endi
if $data02 != -2 then
return -1
endi
if $data03 != -2.00000 then
return -1
endi
if $data04 != -2.000000000 then
return -1
endi
if $data05 != -2 then
return -1
endi
if $data06 != -2 then
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != -2 then
return -1
endi
if $data22 != -2 then
return -1
endi
if $data23 != -2.00000 then
return -1
endi
if $data24 != -2.000000000 then
return -1
endi
if $data25 != -2 then
return -1
endi
if $data26 != -2 then
return -1
endi
if $data27 != 1 then
return -1
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
return -1
endi
## interp(*) from stb + group by + fill(value)
$t = $ts0 + 1000
sql select interp(*) from $stb where ts = $t fill(value, -1, -2) group by tbname
print ====== 0:$data00, 1:$data01, 2:$data02, 3:$data03, 4:$data04, 5:$data05, 6:$data06, 7:$data07, 8:$data08, 9:$data09
print ====== 0:$data20, 1:$data21, 2:$data22, 3:$data23, 4:$data24, 5:$data25, 6:$data26, 7:$data27, 8:$data28, 9:$data29
if $rows != $tbNum then
return -1
endi
if $data00 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data01 != -2 then
return -1
endi
if $data02 != -2 then
return -1
endi
if $data03 != -2.00000 then
return -1
endi
if $data04 != -2.000000000 then
return -1
endi
if $data05 != -2 then
return -1
endi
if $data06 != -2 then
return -1
endi
if $data07 != 1 then
return -1
endi
if $data08 != NULL then
return -1
endi
if $data09 != NULL then
return -1
endi
if $data20 != @18-09-17 09:00:01.000@ then
return -1
endi
if $data21 != -2 then
return -1
endi
if $data22 != -2 then
return -1
endi
if $data23 != -2.00000 then
return -1
endi
if $data24 != -2.000000000 then
return -1
endi
if $data25 != -2 then
return -1
endi
if $data26 != -2 then
return -1
endi
if $data27 != 1 then
return -1
endi
if $data28 != NULL then
return -1
endi
if $data29 != NULL then
return -1
endi
sql_error select interp(ts,c1) from intp_tb0 where ts>'2018-11-25 19:19:00' and ts<'2018-11-25 19:19:12';
sql select interp(ts,c1) from intp_tb0 where ts>'2018-11-25 19:19:00' and ts<'2018-11-25 19:19:12' interval(1s) fill(linear);
if $rows != 0 then
return -1
endi
sql select interp(c1) from intp_tb0 where ts>'2018-11-25 18:09:00' and ts<'2018-11-25 19:20:12' interval(18m);
if $rows != 1 then
return -1
endi
if $data00 != @18-11-25 18:30:00.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
sql select interp(c1,c3,c4,ts) from intp_tb0 where ts>'2018-11-25 18:09:00' and ts<'2018-11-25 19:20:12' interval(18m) fill(linear)
if $rows != 5 then
return -1
endi
if $data00 != @18-11-25 17:54:00.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data02 != 0.00000 then
return -1
endi
if $data03 != 0.000000000 then
return -1
endi
if $data04 != @18-11-25 17:54:00.000@ then
return -1
endi
if $data10 != @18-11-25 18:12:00.000@ then
return -1
endi
if $data11 != 1 then
return -1
endi
if $data12 != 1.20000 then
return -1
endi
if $data13 != 1.200000000 then
return -1
endi
if $data14 != @18-11-25 18:12:00.000@ then
return -1
endi
if $data40 != @18-11-25 19:06:00.000@ then
return -1
endi
if $data41 != 6 then
return -1
endi
if $data42 != 6.60000 then
return -1
endi
if $data43 != 6.600000000 then
return -1
endi
if $data44 != @18-11-25 19:06:00.000@ then
return -1
endi

View File

@ -0,0 +1,61 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/cfg.sh -n dnode1 -c maxtablesPerVnode -v 5
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
$dbPrefix = sav_db
$tbPrefix = sav_tb
$stbPrefix = sav_stb
$tbNum = 20
$rowNum = 10
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
print ========== alter.sim
$i = 0
$db = $dbPrefix
$stb = $stbPrefix
sql drop database if exists $db
sql create database $db
sql use $db
print ====== create tables
sql create table $stb (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 smallint, c6 tinyint, c7 bool, c8 binary(10), c9 nchar(10)) tags(t1 int, t2 int)
$i = 0
$ts = $ts0
while $i < $tbNum
$tb = $tbPrefix . $i
sql create table $tb using $stb tags( $i , 0 )
$i = $i + 1
endw
print ====== table created
#### select distinct tag
sql select distinct t1 from $stb
if $rows != $tbNum then
return -1
endi
#### select distinct tag
sql select distinct t2 from $stb
if $rows != 1 then
print $rows
return -1
endi
#### unsupport sql
sql_error select distinct t1, t2 from &stb
sql drop database $db
sql show databases
if $rows != 0 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -1,6 +1,5 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 0
system sh/exec.sh -n dnode1 -s start
@ -78,7 +77,6 @@ if $rows != 0 then
return -1
endi
sql DROP STABLE $stb
sql show stables

View File

@ -105,3 +105,5 @@ sleep 100
run general/parser/sliding.sim
sleep 100
run general/parser/function.sim
sleep 100
run general/parse/stableOp.sim

View File

@ -159,6 +159,7 @@ cd ../../../debug; make
./test.sh -f general/parser/union.sim
./test.sh -f general/parser/topbot.sim
./test.sh -f general/parser/function.sim
./test.sh -f general/parser/select_distinct_tag.sim
./test.sh -f general/stable/disk.sim
./test.sh -f general/stable/dnode3.sim