diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h
index f2d25c1e84..8c1037127a 100644
--- a/src/client/inc/tscUtil.h
+++ b/src/client/inc/tscUtil.h
@@ -29,15 +29,16 @@ extern "C" {
#include "tsched.h"
#include "tsclient.h"
-#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
+#define UTIL_TABLE_IS_SUPER_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_SUPER_TABLE))
+
#define UTIL_TABLE_IS_CHILD_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_CHILD_TABLE))
-
-#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo)\
- (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo)))
-#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
+#define UTIL_TABLE_IS_NORMAL_TABLE(metaInfo) \
+ (!(UTIL_TABLE_IS_SUPER_TABLE(metaInfo) || UTIL_TABLE_IS_CHILD_TABLE(metaInfo) || UTIL_TABLE_IS_TMP_TABLE(metaInfo)))
+
+#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#pragma pack(push,1)
@@ -221,7 +222,7 @@ void tscExprDestroy(SArray* pExprInfo);
int32_t createProjectionExpr(SQueryInfo* pQueryInfo, STableMetaInfo* pTableMetaInfo, SExprInfo*** pExpr, int32_t* num);
-void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta);
+void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id);
SColumn* tscColumnClone(const SColumn* src);
void tscColumnCopy(SColumn* pDest, const SColumn* pSrc);
@@ -320,7 +321,7 @@ void tscPrintSelNodeList(SSqlObj* pSql, int32_t subClauseIndex);
bool hasMoreVnodesToTry(SSqlObj *pSql);
bool hasMoreClauseToTry(SSqlObj* pSql);
-void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta);
+void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id);
void tscTryQueryNextVnode(SSqlObj *pSql, __async_cb_func_t fp);
void tscTryQueryNextClause(SSqlObj* pSql, __async_cb_func_t fp);
@@ -359,7 +360,7 @@ bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src);
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg);
STblCond* tsGetTableFilter(SArray* filters, uint64_t uid, int16_t idx);
-void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id);
+void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id);
#ifdef __cplusplus
}
diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h
index 8d579b375a..b6821de87a 100644
--- a/src/client/inc/tsclient.h
+++ b/src/client/inc/tsclient.h
@@ -447,7 +447,7 @@ void tscSetResRawPtrRv(SSqlRes* pRes, SQueryInfo* pQueryInfo, SSDataBlock* pBloc
void handleDownstreamOperator(SSqlObj** pSqlList, int32_t numOfUpstream, SQueryInfo* px, SSqlObj* pParent);
void destroyTableNameList(SInsertStatementParam* pInsertParam);
-void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta);
+void tscResetSqlCmd(SSqlCmd *pCmd, bool removeMeta, uint64_t id);
/**
* free query result of the sql object
diff --git a/src/client/src/tscAsync.c b/src/client/src/tscAsync.c
index 174610ec79..eaad783331 100644
--- a/src/client/src/tscAsync.c
+++ b/src/client/src/tscAsync.c
@@ -351,7 +351,7 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
if (pSql->pStream == NULL) {
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
- if (TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
+ if (pQueryInfo != NULL && TSDB_QUERY_HAS_TYPE(pQueryInfo->type, TSDB_QUERY_TYPE_INSERT)) {
tscDebug("0x%" PRIx64 " continue parse sql after get table-meta", pSql->self);
code = tsParseSql(pSql, false);
@@ -381,7 +381,6 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
} else {
if (pSql->retryReason != TSDB_CODE_SUCCESS) {
tscDebug("0x%" PRIx64 " update cached table-meta, re-validate sql statement and send query again", pSql->self);
- tscResetSqlCmd(pCmd, false);
pSql->retryReason = TSDB_CODE_SUCCESS;
} else {
tscDebug("0x%" PRIx64 " cached table-meta, continue validate sql statement and send query", pSql->self);
diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c
index 89e3832007..f1ca6b7754 100644
--- a/src/client/src/tscParseInsert.c
+++ b/src/client/src/tscParseInsert.c
@@ -1595,7 +1595,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (pSql->parseRetry < 1 && (ret == TSDB_CODE_TSC_SQL_SYNTAX_ERROR || ret == TSDB_CODE_TSC_INVALID_OPERATION)) {
tscDebug("0x%"PRIx64 " parse insert sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
- tscResetSqlCmd(pCmd, true);
+ tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++;
if ((ret = tsInsertInitialCheck(pSql)) == TSDB_CODE_SUCCESS) {
@@ -1612,7 +1612,7 @@ int tsParseSql(SSqlObj *pSql, bool initial) {
if (ret == TSDB_CODE_TSC_INVALID_OPERATION && pSql->parseRetry < 1 && sqlInfo.type == TSDB_SQL_SELECT) {
tscDebug("0x%"PRIx64 " parse query sql statement failed, code:%s, clear meta cache and retry ", pSql->self, tstrerror(ret));
- tscResetSqlCmd(pCmd, true);
+ tscResetSqlCmd(pCmd, true, pSql->self);
pSql->parseRetry++;
ret = tscValidateSqlInfo(pSql, &sqlInfo);
diff --git a/src/client/src/tscPrepare.c b/src/client/src/tscPrepare.c
index 40664241c1..7f7966559b 100644
--- a/src/client/src/tscPrepare.c
+++ b/src/client/src/tscPrepare.c
@@ -1694,7 +1694,7 @@ int taos_stmt_set_tbname_tags(TAOS_STMT* stmt, const char* name, TAOS_BIND* tags
if (taosHashGetSize(pCmd->insertParam.pTableBlockHashList) > 0) {
SHashObj* hashList = pCmd->insertParam.pTableBlockHashList;
pCmd->insertParam.pTableBlockHashList = NULL;
- tscResetSqlCmd(pCmd, false);
+ tscResetSqlCmd(pCmd, false, pSql->self);
pCmd->insertParam.pTableBlockHashList = hashList;
}
diff --git a/src/client/src/tscProfile.c b/src/client/src/tscProfile.c
index 92ad9b7924..b00138b4c4 100644
--- a/src/client/src/tscProfile.c
+++ b/src/client/src/tscProfile.c
@@ -18,11 +18,11 @@
#include "tsclient.h"
#include "tsocket.h"
#include "ttimer.h"
-#include "tutil.h"
#include "taosmsg.h"
#include "tcq.h"
#include "taos.h"
+#include "tscUtil.h"
void tscSaveSlowQueryFp(void *handle, void *tmrId);
TAOS *tscSlowQueryConn = NULL;
@@ -227,16 +227,16 @@ void tscKillStream(STscObj *pObj, uint32_t killId) {
int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SHeartBeatMsg *pHeartbeat = pMsg;
+
int allocedQueriesNum = pHeartbeat->numOfQueries;
int allocedStreamsNum = pHeartbeat->numOfStreams;
pHeartbeat->numOfQueries = 0;
SQueryDesc *pQdesc = (SQueryDesc *)pHeartbeat->pData;
- // We extract the lock to tscBuildHeartBeatMsg function.
-
int64_t now = taosGetTimestampMs();
SSqlObj *pSql = pObj->sqlList;
+
while (pSql) {
/*
* avoid sqlobj may not be correctly removed from sql list
@@ -248,41 +248,55 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
}
tstrncpy(pQdesc->sql, pSql->sqlstr, sizeof(pQdesc->sql));
- pQdesc->stime = htobe64(pSql->stime);
- pQdesc->queryId = htonl(pSql->queryId);
- //pQdesc->useconds = htobe64(pSql->res.useconds);
+ pQdesc->stime = htobe64(pSql->stime);
+ pQdesc->queryId = htonl(pSql->queryId);
pQdesc->useconds = htobe64(now - pSql->stime);
- pQdesc->qId = htobe64(pSql->res.qId);
+ pQdesc->qId = htobe64(pSql->res.qId);
pQdesc->sqlObjId = htobe64(pSql->self);
- pQdesc->pid = pHeartbeat->pid;
- pQdesc->stableQuery = pSql->cmd.pQueryInfo->stableQuery;
+ pQdesc->pid = pHeartbeat->pid;
pQdesc->numOfSub = pSql->subState.numOfSub;
+ // todo race condition
+ pQdesc->stableQuery = 0;
+
char *p = pQdesc->subSqlInfo;
int32_t remainLen = sizeof(pQdesc->subSqlInfo);
if (pQdesc->numOfSub == 0) {
snprintf(p, remainLen, "N/A");
} else {
- int32_t len;
- for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
- len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i,
- pSql->pSubs[i]->self,
- pSql->subState.states[i] ? 'C' : 'I');
- if (len > remainLen) {
- break;
+// SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
+// if (pQueryInfo != NULL) {
+// pQdesc->stableQuery = (pQueryInfo->stableQuery)?1:0;
+// } else {
+// pQdesc->stableQuery = 0;
+// }
+
+ if (pSql->pSubs != NULL && pSql->subState.states != NULL) {
+ for (int32_t i = 0; i < pQdesc->numOfSub; ++i) {
+ SSqlObj *psub = pSql->pSubs[i];
+ int64_t self = (psub != NULL)? psub->self : 0;
+
+ int32_t len = snprintf(p, remainLen, "[%d]0x%" PRIx64 "(%c) ", i, self, pSql->subState.states[i] ? 'C' : 'I');
+ if (len > remainLen) {
+ break;
+ }
+
+ remainLen -= len;
+ p += len;
}
- remainLen -= len;
- p += len;
}
}
- pQdesc->numOfSub = htonl(pQdesc->numOfSub);
+ pQdesc->numOfSub = htonl(pQdesc->numOfSub);
taosGetFqdn(pQdesc->fqdn);
pHeartbeat->numOfQueries++;
pQdesc++;
+
pSql = pSql->next;
- if (pHeartbeat->numOfQueries >= allocedQueriesNum) break;
+ if (pHeartbeat->numOfQueries >= allocedQueriesNum) {
+ break;
+ }
}
pHeartbeat->numOfStreams = 0;
diff --git a/src/client/src/tscSQLParser.c b/src/client/src/tscSQLParser.c
index 17b693faf2..03b2b696c2 100644
--- a/src/client/src/tscSQLParser.c
+++ b/src/client/src/tscSQLParser.c
@@ -3725,7 +3725,8 @@ static int32_t doExtractColumnFilterInfo(SSqlCmd* pCmd, SQueryInfo* pQueryInfo,
if (pRight->tokenId != TK_SET || !serializeExprListToVariant(pRight->Expr.paramList, &pVal, colType, timePrecision)) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
- pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen + 1);
+
+ pColumnFilter->pz = (int64_t)calloc(1, pVal->nLen);
pColumnFilter->len = pVal->nLen;
pColumnFilter->filterstr = 1;
memcpy((char *)(pColumnFilter->pz), (char *)(pVal->pz), pVal->nLen);
@@ -5746,14 +5747,19 @@ static void setDefaultOrderInfo(SQueryInfo* pQueryInfo) {
pQueryInfo->order.order = TSDB_ORDER_ASC;
if (isTopBottomQuery(pQueryInfo)) {
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
- } else { // in case of select tbname from super_table, the defualt order column can not be the primary ts column
- pQueryInfo->order.orderColId = INT32_MIN;
+ } else { // in case of select tbname from super_table, the default order column can not be the primary ts column
+ pQueryInfo->order.orderColId = INT32_MIN; // todo define a macro
}
/* for super table query, set default ascending order for group output */
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
}
+
+ if (pQueryInfo->distinct) {
+ pQueryInfo->order.order = TSDB_ORDER_ASC;
+ pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
+ }
}
int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSqlNode, SSchema* pSchema) {
@@ -5761,26 +5767,21 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
const char* msg1 = "invalid column name in orderby clause";
const char* msg2 = "too many order by columns";
const char* msg3 = "only primary timestamp/tbname/first tag in groupby clause allowed";
- const char* msg4 = "only tag in groupby clause allowed in order by";
- const char* msg5 = "only primary timestamp/column in top/bottom function allowed as orderby column";
- const char* msg6 = "only primary timestamp allowed as the second orderby column";
- const char* msg7 = "only primary timestamp/column in groupby clause allowed as orderby column";
- const char* msg8 = "only column in groupby clause allowed as orderby column";
+ const char* msg4 = "only tag in groupby clause allowed in order clause";
+ const char* msg5 = "only primary timestamp/column in top/bottom function allowed as order column";
+ const char* msg6 = "only primary timestamp allowed as the second order column";
+ const char* msg7 = "only primary timestamp/column in groupby clause allowed as order column";
+ const char* msg8 = "only column in groupby clause allowed as order column";
setDefaultOrderInfo(pQueryInfo);
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
-
- if (pQueryInfo->distinct == true) {
- pQueryInfo->order.order = TSDB_ORDER_ASC;
- pQueryInfo->order.orderColId = 0;
- return TSDB_CODE_SUCCESS;
- }
- if (pSqlNode->pSortOrder == NULL) {
+ if (pQueryInfo->distinct || pSqlNode->pSortOrder == NULL) {
return TSDB_CODE_SUCCESS;
}
- SArray* pSortorder = pSqlNode->pSortOrder;
+ char* pMsgBuf = tscGetErrorMsgPayload(pCmd);
+ SArray* pSortOrder = pSqlNode->pSortOrder;
/*
* for table query, there is only one or none order option is allowed, which is the
@@ -5788,19 +5789,19 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
*
* for super table query, the order option must be less than 3.
*/
- size_t size = taosArrayGetSize(pSortorder);
- if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo)) {
+ size_t size = taosArrayGetSize(pSortOrder);
+ if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo)) {
if (size > 1) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg0);
+ return invalidOperationMsg(pMsgBuf, msg0);
}
} else {
if (size > 2) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg2);
+ return invalidOperationMsg(pMsgBuf, msg2);
}
}
// handle the first part of order by
- tVariant* pVar = taosArrayGet(pSortorder, 0);
+ tVariant* pVar = taosArrayGet(pSortOrder, 0);
// e.g., order by 1 asc, return directly with out further check.
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
@@ -5812,7 +5813,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) { // super table query
if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
+ return invalidOperationMsg(pMsgBuf, msg1);
}
bool orderByTags = false;
@@ -5824,7 +5825,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
// it is a tag column
if (pQueryInfo->groupbyExpr.columnInfo == NULL) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg4);
+ return invalidOperationMsg(pMsgBuf, msg4);
}
SColIndex* pColIndex = taosArrayGet(pQueryInfo->groupbyExpr.columnInfo, 0);
if (relTagIndex == pColIndex->colIndex) {
@@ -5845,13 +5846,14 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
orderByGroupbyCol = true;
}
}
+
if (!(orderByTags || orderByTS || orderByGroupbyCol) && !isTopBottomQuery(pQueryInfo)) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg3);
+ return invalidOperationMsg(pMsgBuf, msg3);
} else { // order by top/bottom result value column is not supported in case of interval query.
assert(!(orderByTags && orderByTS && orderByGroupbyCol));
}
- size_t s = taosArrayGetSize(pSortorder);
+ size_t s = taosArrayGetSize(pSortOrder);
if (s == 1) {
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
@@ -5870,7 +5872,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
+ return invalidOperationMsg(pMsgBuf, msg5);
}
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
@@ -5888,9 +5890,7 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
addPrimaryTsColIntoResult(pQueryInfo, pCmd);
}
}
- }
-
- if (s == 2) {
+ } else {
tVariantListItem *pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
if (orderByTags) {
pQueryInfo->groupbyExpr.orderIndex = index.columnIndex - tscGetNumOfColumns(pTableMetaInfo->pTableMeta);
@@ -5907,22 +5907,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
tVariant* pVar2 = &pItem->pVar;
SStrToken cname = {pVar2->nLen, pVar2->nType, pVar2->pz};
if (getColumnIndexByName(&cname, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
+ return invalidOperationMsg(pMsgBuf, msg1);
}
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg6);
+ return invalidOperationMsg(pMsgBuf, msg6);
} else {
- tVariantListItem* p1 = taosArrayGet(pSortorder, 1);
+ tVariantListItem* p1 = taosArrayGet(pSortOrder, 1);
pQueryInfo->order.order = p1->sortOrder;
pQueryInfo->order.orderColId = PRIMARYKEY_TIMESTAMP_COL_INDEX;
}
}
- } else { // meter query
- if (getColumnIndexByName(&columnName, pQueryInfo, &index, tscGetErrorMsgPayload(pCmd)) != TSDB_CODE_SUCCESS) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg1);
+ } else if (UTIL_TABLE_IS_NORMAL_TABLE(pTableMetaInfo) || UTIL_TABLE_IS_CHILD_TABLE(pTableMetaInfo)) { // check order by clause for normal table & temp table
+ if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
+ return invalidOperationMsg(pMsgBuf, msg1);
}
+
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX && !isTopBottomQuery(pQueryInfo)) {
bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
@@ -5930,23 +5931,23 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
validOrder = (pColIndex->colIndex == index.columnIndex);
}
+
if (!validOrder) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg7);
+ return invalidOperationMsg(pMsgBuf, msg7);
}
+
tVariantListItem* p1 = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->groupbyExpr.orderIndex = pSchema[index.columnIndex].colId;
pQueryInfo->groupbyExpr.orderType = p1->sortOrder;
-
}
if (isTopBottomQuery(pQueryInfo)) {
- bool validOrder = false;
SArray *columnInfo = pQueryInfo->groupbyExpr.columnInfo;
if (columnInfo != NULL && taosArrayGetSize(columnInfo) > 0) {
SColIndex* pColIndex = taosArrayGet(columnInfo, 0);
- validOrder = (pColIndex->colIndex == index.columnIndex);
- if (!validOrder) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg8);
+
+ if (pColIndex->colIndex == index.columnIndex) {
+ return invalidOperationMsg(pMsgBuf, msg8);
}
} else {
/* order of top/bottom query in interval is not valid */
@@ -5955,9 +5956,8 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
pExpr = tscExprGet(pQueryInfo, 1);
if (pExpr->base.colInfo.colIndex != index.columnIndex && index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_INDEX) {
- return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg5);
+ return invalidOperationMsg(pMsgBuf, msg5);
}
- validOrder = true;
}
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
@@ -5967,6 +5967,18 @@ int32_t validateOrderbyNode(SSqlCmd* pCmd, SQueryInfo* pQueryInfo, SSqlNode* pSq
return TSDB_CODE_SUCCESS;
}
+ tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
+ pQueryInfo->order.order = pItem->sortOrder;
+ pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
+ } else {
+ // handle the temp table order by clause. You can order by any single column in case of the temp table, created by
+ // inner subquery.
+ assert(UTIL_TABLE_IS_TMP_TABLE(pTableMetaInfo) && taosArrayGetSize(pSqlNode->pSortOrder) == 1);
+
+ if (getColumnIndexByName(&columnName, pQueryInfo, &index, pMsgBuf) != TSDB_CODE_SUCCESS) {
+ return invalidOperationMsg(pMsgBuf, msg1);
+ }
+
tVariantListItem* pItem = taosArrayGet(pSqlNode->pSortOrder, 0);
pQueryInfo->order.order = pItem->sortOrder;
pQueryInfo->order.orderColId = pSchema[index.columnIndex].colId;
@@ -8734,8 +8746,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
const char* msg8 = "condition missing for join query";
const char* msg9 = "not support 3 level select";
- int32_t code = TSDB_CODE_SUCCESS;
-
+ int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
@@ -8757,7 +8768,7 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
}
if (pSqlNode->from->type == SQL_NODE_FROM_SUBQUERY) {
- clearAllTableMetaInfo(pQueryInfo, false);
+ clearAllTableMetaInfo(pQueryInfo, false, pSql->self);
pQueryInfo->numOfTables = 0;
// parse the subquery in the first place
@@ -9026,8 +9037,6 @@ int32_t validateSqlNode(SSqlObj* pSql, SSqlNode* pSqlNode, SQueryInfo* pQueryInf
pQueryInfo->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryInfo->onlyTagQuery = onlyTagPrjFunction(pQueryInfo);
pQueryInfo->groupbyColumn = tscGroupbyColumn(pQueryInfo);
- //pQueryInfo->globalMerge = tscIsTwoStageSTableQuery(pQueryInfo, 0);
-
pQueryInfo->arithmeticOnAgg = tsIsArithmeticQueryOnAggResult(pQueryInfo);
pQueryInfo->orderProjectQuery = tscOrderedProjectionQueryOnSTable(pQueryInfo, 0);
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index f0ee180bbe..0ad3dd208e 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -525,6 +525,7 @@ static void doProcessMsgFromServer(SSchedMsg* pSchedMsg) {
}
void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
+ int64_t st = taosGetTimestampUs();
SSchedMsg schedMsg = {0};
schedMsg.fp = doProcessMsgFromServer;
@@ -543,6 +544,11 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
+
+ int64_t et = taosGetTimestampUs();
+ if (et - st > 100) {
+ tscDebug("add message to task queue, elapsed time:%"PRId64, et - st);
+ }
}
int doBuildAndSendMsg(SSqlObj *pSql) {
@@ -897,16 +903,16 @@ int tscBuildQueryMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
}
SQueryInfo *pQueryInfo = tscGetQueryInfo(pCmd);
+ STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
+ STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
SQueryAttr query = {{0}};
tscCreateQueryFromQueryInfo(pQueryInfo, &query, pSql);
+ query.vgId = pTableMeta->vgId;
SArray* tableScanOperator = createTableScanPlan(&query);
SArray* queryOperator = createExecOperatorPlan(&query);
- STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
- STableMeta * pTableMeta = pTableMetaInfo->pTableMeta;
-
SQueryTableMsg *pQueryMsg = (SQueryTableMsg *)pCmd->payload;
tstrncpy(pQueryMsg->version, version, tListLen(pQueryMsg->version));
@@ -2269,6 +2275,10 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
pMsg = buf;
}
+ if (pParentCmd->pTableMetaMap == NULL) {
+ pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ }
+
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
int32_t code = tableMetaMsgConvert(pMetaMsg);
@@ -2605,7 +2615,7 @@ int tscProcessDropDbRsp(SSqlObj *pSql) {
int tscProcessDropTableRsp(SSqlObj *pSql) {
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pSql->cmd, 0);
- tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
+ tscRemoveCachedTableMeta(pTableMetaInfo, pSql->self);
tfree(pTableMetaInfo->pTableMeta);
return 0;
}
@@ -2990,13 +3000,11 @@ int tscRenewTableMeta(SSqlObj *pSql, int32_t tableIndex) {
tscGetNumOfTags(pTableMeta), tscGetNumOfColumns(pTableMeta), pTableMeta->id.uid);
}
+
// remove stored tableMeta info in hash table
- tscRemoveTableMetaBuf(pTableMetaInfo, pSql->self);
+ tscResetSqlCmd(pCmd, true, pSql->self);
- pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
- pCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
-
- SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
+ SArray* pNameList = taosArrayInit(1, POINTER_BYTES);
SArray* vgroupList = taosArrayInit(1, POINTER_BYTES);
char* n = strdup(name);
diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c
index 2c4bc5f764..9f2b79e891 100644
--- a/src/client/src/tscStream.c
+++ b/src/client/src/tscStream.c
@@ -113,7 +113,7 @@ static void doLaunchQuery(void* param, TAOS_RES* tres, int32_t code) {
pQueryInfo->command = TSDB_SQL_SELECT;
- pSql->fp = tscProcessStreamQueryCallback;
+ pSql->fp = tscProcessStreamQueryCallback;
pSql->fetchFp = tscProcessStreamQueryCallback;
executeQuery(pSql, pQueryInfo);
tscIncStreamExecutionCount(pStream);
@@ -142,6 +142,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
if(pSql == NULL) {
return ;
}
+
SQueryInfo* pQueryInfo = tscGetQueryInfo(&pSql->cmd);
tscDebug("0x%"PRIx64" add into timer", pSql->self);
@@ -186,14 +187,16 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
}
// launch stream computing in a new thread
- SSchedMsg schedMsg = { 0 };
- schedMsg.fp = tscProcessStreamLaunchQuery;
+ SSchedMsg schedMsg = {0};
+ schedMsg.fp = tscProcessStreamLaunchQuery;
schedMsg.ahandle = pStream;
schedMsg.thandle = (void *)1;
- schedMsg.msg = NULL;
+ schedMsg.msg = NULL;
taosScheduleTask(tscQhandle, &schedMsg);
}
+static void cbParseSql(void* param, TAOS_RES* res, int code);
+
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows) {
SSqlStream *pStream = (SSqlStream *)param;
if (tres == NULL || numOfRows < 0) {
@@ -201,24 +204,26 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf
tscError("0x%"PRIx64" stream:%p, query data failed, code:0x%08x, retry in %" PRId64 "ms", pStream->pSql->self,
pStream, numOfRows, retryDelay);
- STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(&pStream->pSql->cmd, 0);
+ SSqlObj* pSql = pStream->pSql;
- char name[TSDB_TABLE_FNAME_LEN] = {0};
- tNameExtractFullName(&pTableMetaInfo->name, name);
+ tscFreeSqlResult(pSql);
+ tscFreeSubobj(pSql);
+ tfree(pSql->pSubs);
+ pSql->subState.numOfSub = 0;
- taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
+ int32_t code = tsParseSql(pSql, true);
+ if (code == TSDB_CODE_SUCCESS) {
+ cbParseSql(pStream, pSql, code);
+ } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
+ tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
+ } else {
+ tscError("0x%"PRIx64" open stream failed, code:%s", pSql->self, tstrerror(code));
+ taosReleaseRef(tscObjRef, pSql->self);
+ free(pStream);
+ }
- tfree(pTableMetaInfo->pTableMeta);
-
- tscFreeSqlResult(pStream->pSql);
- tscFreeSubobj(pStream->pSql);
- tfree(pStream->pSql->pSubs);
- pStream->pSql->subState.numOfSub = 0;
-
- pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
-
- tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
- return;
+// tscSetRetryTimer(pStream, pStream->pSql, retryDelay);
+// return;
}
taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param);
@@ -555,7 +560,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
if (code != TSDB_CODE_SUCCESS) {
pSql->res.code = code;
tscError("0x%"PRIx64" open stream failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
-
pStream->fp(pStream->param, NULL, NULL);
return;
}
@@ -582,9 +586,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
// set stime with ltime if ltime > stime
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
- tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
+ tscDebug("0x%"PRIx64" CQ table %s ltime is %"PRId64, pSql->self, dstTable, pStream->ltime);
+
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
- tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime);
+ tscWarn("0x%"PRIx64" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime > 0", pSql->self, dstTable, pStream->stime, pStream->ltime);
pStream->stime = pStream->ltime;
}
@@ -592,7 +597,6 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pCmd->command = TSDB_SQL_SELECT;
tscAddIntoStreamList(pStream);
-
taosTmrReset(tscProcessStreamTimer, (int32_t)starttime, pStream, tscTmr, &pStream->pTimer);
tscDebug("0x%"PRIx64" stream:%p is opened, query on:%s, interval:%" PRId64 ", sliding:%" PRId64 ", first launched in:%" PRId64 ", sql:%s", pSql->self,
@@ -659,10 +663,9 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
- return ;
}
-TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
+TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
STscObj *pObj = (STscObj *)taos;
if (pObj == NULL || pObj->signature != pObj) return NULL;
@@ -697,14 +700,12 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pStream->param = param;
pStream->pSql = pSql;
pStream->cqhandle = cqhandle;
- pSql->pStream = pStream;
- pSql->param = pStream;
- pSql->maxRetry = TSDB_MAX_REPLICA;
tscSetStreamDestTable(pStream, dstTable);
pSql->pStream = pStream;
pSql->param = pStream;
pSql->maxRetry = TSDB_MAX_REPLICA;
+
pSql->sqlstr = calloc(1, strlen(sqlstr) + 1);
if (pSql->sqlstr == NULL) {
tscError("0x%"PRIx64" failed to malloc sql string buffer", pSql->self);
@@ -725,14 +726,13 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
pSql->fp = cbParseSql;
pSql->fetchFp = cbParseSql;
-
registerSqlObj(pSql);
int32_t code = tsParseSql(pSql, true);
if (code == TSDB_CODE_SUCCESS) {
cbParseSql(pStream, pSql, code);
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
- tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
+ tscDebug("0x%"PRIx64" CQ taso_open_stream IN Process", pSql->self);
} else {
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
taosReleaseRef(tscObjRef, pSql->self);
@@ -743,7 +743,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
return pStream;
}
-TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
+TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *, TAOS_RES *, TAOS_ROW),
int64_t stime, void *param, void (*callback)(void *)) {
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
}
diff --git a/src/client/src/tscSubquery.c b/src/client/src/tscSubquery.c
index 0d26ec58f6..f3af685ceb 100644
--- a/src/client/src/tscSubquery.c
+++ b/src/client/src/tscSubquery.c
@@ -2727,16 +2727,10 @@ void tscHandleSubqueryError(SRetrieveSupport *trsupport, SSqlObj *pSql, int numO
int32_t code = pParentSql->res.code;
if ((code == TSDB_CODE_TDB_INVALID_TABLE_ID || code == TSDB_CODE_VND_INVALID_VGROUP_ID) && pParentSql->retry < pParentSql->maxRetry) {
// remove the cached tableMeta and vgroup id list, and then parse the sql again
- SSqlCmd* pParentCmd = &pParentSql->cmd;
- STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
- tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
+ tscResetSqlCmd( &pParentSql->cmd, true, pParentSql->self);
- pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
- pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
-
- pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
-
+ pParentSql->res.code = TSDB_CODE_SUCCESS;
tscDebug("0x%"PRIx64" retry parse sql and send query, prev error: %s, retry:%d", pParentSql->self,
tstrerror(code), pParentSql->retry);
@@ -3040,7 +3034,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (taos_errno(pSql) != TSDB_CODE_SUCCESS) {
assert(code == taos_errno(pSql));
- if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID)) {
+ if (trsupport->numOfRetry++ < MAX_NUM_OF_SUBQUERY_RETRY && (code != TSDB_CODE_TDB_INVALID_TABLE_ID && code != TSDB_CODE_VND_INVALID_VGROUP_ID)) {
tscError("0x%"PRIx64" sub:0x%"PRIx64" failed code:%s, retry:%d", pParentSql->self, pSql->self, tstrerror(code), trsupport->numOfRetry);
int32_t sent = 0;
@@ -3151,7 +3145,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
numOfFailed += 1;
// clean up tableMeta in cache
- tscFreeQueryInfo(&pSql->cmd, false);
+ tscFreeQueryInfo(&pSql->cmd, false, pSql->self);
SQueryInfo* pQueryInfo = tscGetQueryInfoS(&pSql->cmd);
STableMetaInfo* pMasterTableMetaInfo = tscGetTableMetaInfoFromCmd(&pParentObj->cmd, 0);
tscAddTableMetaInfo(pQueryInfo, &pMasterTableMetaInfo->name, NULL, NULL, NULL, NULL);
@@ -3173,7 +3167,7 @@ static void multiVnodeInsertFinalize(void* param, TAOS_RES* tres, int numOfRows)
}
pParentObj->res.code = TSDB_CODE_SUCCESS;
- tscResetSqlCmd(&pParentObj->cmd, false);
+ tscResetSqlCmd(&pParentObj->cmd, false, pParentObj->self);
// in case of insert, redo parsing the sql string and build new submit data block for two reasons:
// 1. the table Id(tid & uid) may have been update, the submit block needs to be updated accordingly.
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index 19a816faeb..53ad775852 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -1228,11 +1228,9 @@ void handleDownstreamOperator(SSqlObj** pSqlObjList, int32_t numOfUpstream, SQue
if (pCond && pCond->cond) {
createQueryFilter(pCond->cond, pCond->len, &pFilters);
}
- //createInputDataFlterInfo(px, numOfCol1, &numOfFilterCols, &pFilterInfo);
}
SOperatorInfo* pSourceOperator = createDummyInputOperator(pSqlObjList[0], pSchema, numOfCol1, pFilters);
-
pOutput->precision = pSqlObjList[0]->res.precision;
SSchema* schema = NULL;
@@ -1332,12 +1330,13 @@ static void tscDestroyResPointerInfo(SSqlRes* pRes) {
pRes->data = NULL; // pRes->data points to the buffer of pRsp, no need to free
}
-void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
+void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeCachedMeta, uint64_t id) {
if (pCmd == NULL) {
return;
}
SQueryInfo* pQueryInfo = pCmd->pQueryInfo;
+
while(pQueryInfo != NULL) {
SQueryInfo* p = pQueryInfo->sibling;
@@ -1346,7 +1345,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
SQueryInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pUpstream, i);
freeQueryInfoImpl(pUpQueryInfo);
- clearAllTableMetaInfo(pUpQueryInfo, removeMeta);
+ clearAllTableMetaInfo(pUpQueryInfo, removeCachedMeta, id);
if (pUpQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pUpQueryInfo->pQInfo);
pUpQueryInfo->pQInfo = NULL;
@@ -1362,7 +1361,7 @@ void tscFreeQueryInfo(SSqlCmd* pCmd, bool removeMeta) {
}
freeQueryInfoImpl(pQueryInfo);
- clearAllTableMetaInfo(pQueryInfo, removeMeta);
+ clearAllTableMetaInfo(pQueryInfo, removeCachedMeta, id);
if (pQueryInfo->pQInfo != NULL) {
qDestroyQueryInfo(pQueryInfo->pQInfo);
@@ -1391,7 +1390,7 @@ void destroyTableNameList(SInsertStatementParam* pInsertParam) {
tfree(pInsertParam->pTableNameList);
}
-void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
+void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta, uint64_t id) {
pCmd->command = 0;
pCmd->numOfCols = 0;
pCmd->count = 0;
@@ -1405,19 +1404,8 @@ void tscResetSqlCmd(SSqlCmd* pCmd, bool clearCachedMeta) {
tfree(pCmd->insertParam.tagData.data);
pCmd->insertParam.tagData.dataLen = 0;
- tscFreeQueryInfo(pCmd, clearCachedMeta);
-
- if (pCmd->pTableMetaMap != NULL) {
- STableMetaVgroupInfo* p = taosHashIterate(pCmd->pTableMetaMap, NULL);
- while (p) {
- taosArrayDestroy(p->vgroupIdList);
- tfree(p->pTableMeta);
- p = taosHashIterate(pCmd->pTableMetaMap, p);
- }
-
- taosHashCleanup(pCmd->pTableMetaMap);
- pCmd->pTableMetaMap = NULL;
- }
+ tscFreeQueryInfo(pCmd, clearCachedMeta, id);
+ pCmd->pTableMetaMap = tscCleanupTableMetaMap(pCmd->pTableMetaMap);
}
void* tscCleanupTableMetaMap(SHashObj* pTableMetaMap) {
@@ -1513,8 +1501,6 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscFreeMetaSqlObj(&pSql->metaRid);
tscFreeMetaSqlObj(&pSql->svgroupRid);
- tscFreeSubobj(pSql);
-
SSqlCmd* pCmd = &pSql->cmd;
int32_t cmd = pCmd->command;
if (cmd < TSDB_SQL_INSERT || cmd == TSDB_SQL_RETRIEVE_GLOBALMERGE || cmd == TSDB_SQL_RETRIEVE_EMPTY_RESULT ||
@@ -1522,6 +1508,8 @@ void tscFreeSqlObj(SSqlObj* pSql) {
tscRemoveFromSqlList(pSql);
}
+ tscFreeSubobj(pSql);
+
pSql->signature = NULL;
pSql->fp = NULL;
tfree(pSql->sqlstr);
@@ -1532,7 +1520,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
pSql->self = 0;
tscFreeSqlResult(pSql);
- tscResetSqlCmd(pCmd, false);
+ tscResetSqlCmd(pCmd, false, pSql->self);
memset(pCmd->payload, 0, (size_t)pCmd->allocSize);
tfree(pCmd->payload);
@@ -3379,20 +3367,15 @@ SArray* tscVgroupTableInfoDup(SArray* pVgroupTables) {
return pa;
}
-void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta) {
+void clearAllTableMetaInfo(SQueryInfo* pQueryInfo, bool removeMeta, uint64_t id) {
for(int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
STableMetaInfo* pTableMetaInfo = tscGetMetaInfo(pQueryInfo, i);
-
if (removeMeta) {
- char name[TSDB_TABLE_FNAME_LEN] = {0};
- tNameExtractFullName(&pTableMetaInfo->name, name);
- taosHashRemove(tscTableMetaMap, name, strnlen(name, TSDB_TABLE_FNAME_LEN));
+ tscRemoveCachedTableMeta(pTableMetaInfo, id);
}
tscFreeVgroupTableInfo(pTableMetaInfo->pVgroupTables);
tscClearTableMetaInfo(pTableMetaInfo);
-
- free(pTableMetaInfo);
}
tfree(pQueryInfo->pTableMetaInfo);
@@ -3459,10 +3442,12 @@ void tscClearTableMetaInfo(STableMetaInfo* pTableMetaInfo) {
}
tfree(pTableMetaInfo->pTableMeta);
-
pTableMetaInfo->vgroupList = tscVgroupInfoClear(pTableMetaInfo->vgroupList);
+
tscColumnListDestroy(pTableMetaInfo->tagColList);
pTableMetaInfo->tagColList = NULL;
+
+ free(pTableMetaInfo);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
@@ -3860,13 +3845,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
// todo refactor
tscDebug("0x%"PRIx64" all subquery response received, retry", pParentSql->self);
-
- SSqlCmd* pParentCmd = &pParentSql->cmd;
- STableMetaInfo* pTableMetaInfo = tscGetTableMetaInfoFromCmd(pParentCmd, 0);
- tscRemoveTableMetaBuf(pTableMetaInfo, pParentSql->self);
-
- pParentCmd->pTableMetaMap = tscCleanupTableMetaMap(pParentCmd->pTableMetaMap);
- pParentCmd->pTableMetaMap = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
+ tscResetSqlCmd(&pParentSql->cmd, true, pParentSql->self);
pParentSql->res.code = TSDB_CODE_SUCCESS;
pParentSql->retry++;
@@ -3885,7 +3864,7 @@ static void tscSubqueryCompleteCallback(void* param, TAOS_RES* tres, int code) {
return;
}
- SQueryInfo *pQueryInfo = tscGetQueryInfo(pParentCmd);
+ SQueryInfo *pQueryInfo = tscGetQueryInfo(&pParentSql->cmd);
executeQuery(pParentSql, pQueryInfo);
return;
}
@@ -5009,7 +4988,7 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
return info;
}
-void tscRemoveTableMetaBuf(STableMetaInfo* pTableMetaInfo, uint64_t id) {
+void tscRemoveCachedTableMeta(STableMetaInfo* pTableMetaInfo, uint64_t id) {
char fname[TSDB_TABLE_FNAME_LEN] = {0};
tNameExtractFullName(&pTableMetaInfo->name, fname);
diff --git a/src/common/src/tname.c b/src/common/src/tname.c
index e92169f097..532333651d 100644
--- a/src/common/src/tname.c
+++ b/src/common/src/tname.c
@@ -70,12 +70,11 @@ SColumnFilterInfo* tFilterInfoDup(const SColumnFilterInfo* src, int32_t numOfFil
memcpy(pFilter, src, sizeof(SColumnFilterInfo) * numOfFilters);
for (int32_t j = 0; j < numOfFilters; ++j) {
-
if (pFilter[j].filterstr) {
size_t len = (size_t) pFilter[j].len + 1 * TSDB_NCHAR_SIZE;
pFilter[j].pz = (int64_t) calloc(1, len);
- memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t)len);
+ memcpy((char*)pFilter[j].pz, (char*)src[j].pz, (size_t) pFilter[j].len);
}
}
diff --git a/src/connector/grafanaplugin b/src/connector/grafanaplugin
index a44ec1ca49..4a4d79099b 160000
--- a/src/connector/grafanaplugin
+++ b/src/connector/grafanaplugin
@@ -1 +1 @@
-Subproject commit a44ec1ca493ad01b2bf825b6418f69e11f548206
+Subproject commit 4a4d79099b076b8ff12d5b4fdbcba54049a6866d
diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h
index 1767b25402..837db72274 100644
--- a/src/inc/taosmsg.h
+++ b/src/inc/taosmsg.h
@@ -879,7 +879,7 @@ typedef struct {
uint64_t sqlObjId;
int32_t pid;
char fqdn[TSDB_FQDN_LEN];
- bool stableQuery;
+ uint8_t stableQuery;
int32_t numOfSub;
char subSqlInfo[TSDB_SHOW_SUBQUERY_LEN]; //include subqueries' index, Obj IDs and states(C-complete/I-imcomplete)
} SQueryDesc;
diff --git a/src/query/inc/qExecutor.h b/src/query/inc/qExecutor.h
index 56fab57e26..2c57310a2f 100644
--- a/src/query/inc/qExecutor.h
+++ b/src/query/inc/qExecutor.h
@@ -335,6 +335,7 @@ enum OPERATOR_TYPE_E {
OP_StateWindow = 22,
OP_AllTimeWindow = 23,
OP_AllMultiTableTimeInterval = 24,
+ OP_Order = 25,
};
typedef struct SOperatorInfo {
@@ -422,7 +423,6 @@ typedef struct STableScanInfo {
int32_t *rowCellInfoOffset;
SExprInfo *pExpr;
SSDataBlock block;
- bool loadExternalRows; // load external rows (prev & next rows)
int32_t numOfOutput;
int64_t elapsedTime;
@@ -513,7 +513,7 @@ typedef struct SStateWindowOperatorInfo {
int32_t start;
char* prevData; // previous data
bool reptScan;
-} SStateWindowOperatorInfo ;
+} SStateWindowOperatorInfo;
typedef struct SDistinctOperatorInfo {
SHashObj *pSet;
@@ -546,6 +546,13 @@ typedef struct SMultiwayMergeInfo {
SArray *udfInfo;
} SMultiwayMergeInfo;
+// todo support the disk-based sort
+typedef struct SOrderOperatorInfo {
+ int32_t colIndex;
+ int32_t order;
+ SSDataBlock *pDataBlock;
+} SOrderOperatorInfo;
+
void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream);
SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbQueryHandle, SQueryRuntimeEnv* pRuntimeEnv, int32_t repeatTime, int32_t reverseTime);
@@ -575,6 +582,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator
int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter);
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput);
+SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal);
SSDataBlock* doGlobalAggregate(void* param, bool* newgroup);
SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup);
diff --git a/src/query/inc/qExtbuffer.h b/src/query/inc/qExtbuffer.h
index cf0e8ce31a..b5ea9932b9 100644
--- a/src/query/inc/qExtbuffer.h
+++ b/src/query/inc/qExtbuffer.h
@@ -227,6 +227,8 @@ typedef int (*__col_compar_fn_t)(tOrderDescriptor *, int32_t numOfRows, int32_t
void tColDataQSort(tOrderDescriptor *, int32_t numOfRows, int32_t start, int32_t end, char *data, int32_t orderType);
+void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn);
+
int32_t compare_sa(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
int32_t compare_sd(tOrderDescriptor *, int32_t numOfRows, int32_t idx1, int32_t idx2, char *data);
diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c
index 9000bcdf77..e0761b38b3 100644
--- a/src/query/src/qExecutor.c
+++ b/src/query/src/qExecutor.c
@@ -16,7 +16,6 @@
#include "qFill.h"
#include "taosmsg.h"
#include "tglobal.h"
-#include "talgo.h"
#include "exception.h"
#include "hash.h"
@@ -224,6 +223,7 @@ static void destroySFillOperatorInfo(void* param, int32_t numOfOutput);
static void destroyGroupbyOperatorInfo(void* param, int32_t numOfOutput);
static void destroyProjectOperatorInfo(void* param, int32_t numOfOutput);
static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput);
+static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput);
static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyStateWindowOperatorInfo(void* param, int32_t numOfOutput);
static void destroyAggOperatorInfo(void* param, int32_t numOfOutput);
@@ -242,8 +242,7 @@ static void setCtxTagForJoin(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx
static void setParamForStableStddev(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr);
static void setParamForStableStddevByColData(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes);
static void doSetTableGroupOutputBuf(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo,
- SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput,
- int32_t groupIndex);
+ SQLFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId);
SArray* getOrderCheckColumns(SQueryAttr* pQuery);
@@ -963,8 +962,6 @@ void doInvokeUdf(SUdfInfo* pUdfInfo, SQLFunctionCtx *pCtx, int32_t idx, int32_t
break;
}
}
-
- return;
}
static void doApplyFunctions(SQueryRuntimeEnv* pRuntimeEnv, SQLFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset,
@@ -1622,12 +1619,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
}
startPos = pSDataBlock->info.rows - 1;
-
+
// window start(end) key interpolation
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
}
-
+
break;
}
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
@@ -2213,6 +2210,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
}
break;
}
+
case OP_StateWindow: {
pRuntimeEnv->proot = createStatewindowOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
@@ -2229,24 +2227,20 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_Filter: { // todo refactor
int32_t numOfFilterCols = 0;
-// if (pQueryAttr->numOfFilterCols > 0) {
-// pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
-// pQueryAttr->numOfOutput, pQueryAttr->tableCols, pQueryAttr->numOfFilterCols);
-// } else {
- if (pQueryAttr->stableQuery) {
- SColumnInfo* pColInfo =
- extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
- pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
- pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
- freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
- } else {
- SColumnInfo* pColInfo =
- extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
- pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
- pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
- freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
- }
-// }
+ if (pQueryAttr->stableQuery) {
+ SColumnInfo* pColInfo =
+ extractColumnFilterInfo(pQueryAttr->pExpr3, pQueryAttr->numOfExpr3, &numOfFilterCols);
+ pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr3,
+ pQueryAttr->numOfExpr3, pColInfo, numOfFilterCols);
+ freeColumnInfo(pColInfo, pQueryAttr->numOfExpr3);
+ } else {
+ SColumnInfo* pColInfo =
+ extractColumnFilterInfo(pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &numOfFilterCols);
+ pRuntimeEnv->proot = createFilterOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1,
+ pQueryAttr->numOfOutput, pColInfo, numOfFilterCols);
+ freeColumnInfo(pColInfo, pQueryAttr->numOfOutput);
+ }
+
break;
}
@@ -2258,11 +2252,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
case OP_MultiwayMergeSort: {
bool groupMix = true;
- if(pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
+ if (pQueryAttr->slimit.offset != 0 || pQueryAttr->slimit.limit != -1) {
groupMix = false;
}
+
pRuntimeEnv->proot = createMultiwaySortOperatorInfo(pRuntimeEnv, pQueryAttr->pExpr1, pQueryAttr->numOfOutput,
- 4096, merger, groupMix); // TODO hack it
+ 4096, merger, groupMix); // TODO hack it
break;
}
@@ -2283,6 +2278,11 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
break;
}
+ case OP_Order: {
+ pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order);
+ break;
+ }
+
default: {
assert(0);
}
@@ -3040,7 +3040,7 @@ void doSetFilterColInfo(SFilterInfo * pFilters, SSDataBlock* pBlock) {
int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
uint32_t* status) {
*status = BLK_DATA_NO_NEEDED;
- pBlock->pDataBlock = NULL;
+ pBlock->pDataBlock = NULL;
pBlock->pBlockStatis = NULL;
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
@@ -3050,6 +3050,9 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
SQInfo* pQInfo = pRuntimeEnv->qinfo;
SQueryCostInfo* pCost = &pQInfo->summary;
+ pCost->totalBlocks += 1;
+ pCost->totalRows += pBlock->info.rows;
+
if (pRuntimeEnv->pTsBuf != NULL) {
(*status) = BLK_DATA_ALL_NEEDED;
@@ -3081,7 +3084,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
// check if this data block is required to load
if ((*status) != BLK_DATA_ALL_NEEDED) {
bool needFilter = true;
-
+
// the pCtx[i] result is belonged to previous time window since the outputBuf has not been set yet,
// the filter result may be incorrect. So in case of interval query, we need to set the correct time output buffer
if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) {
@@ -3488,12 +3491,11 @@ void copyToSDataBlock(SQueryRuntimeEnv* pRuntimeEnv, int32_t threshold, SSDataBl
}
}
- // enough results in data buffer, return
- if (pBlock->info.rows >= threshold) {
- break;
- }
+ // enough results in data buffer, return
+ if (pBlock->info.rows >= threshold) {
+ break;
}
-
+ }
}
static void updateTableQueryInfoForReverseScan(STableQueryInfo *pTableQueryInfo) {
@@ -5399,6 +5401,114 @@ SOperatorInfo *createMultiwaySortOperatorInfo(SQueryRuntimeEnv *pRuntimeEnv, SEx
return pOperator;
}
+static int32_t doMergeSDatablock(SSDataBlock* pDest, SSDataBlock* pSrc) {
+ assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols);
+
+ int32_t numOfCols = pSrc->info.numOfCols;
+ for(int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i);
+ SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i);
+
+ int32_t newSize = (pDest->info.rows + pSrc->info.rows) * pCol2->info.bytes;
+ char* tmp = realloc(pCol2->pData, newSize);
+ if (tmp != NULL) {
+ pCol2->pData = tmp;
+ int32_t offset = pCol2->info.bytes * pDest->info.rows;
+ memcpy(pCol2->pData + offset, pCol1->pData, pSrc->info.rows * pCol2->info.bytes);
+ } else {
+ return TSDB_CODE_VND_OUT_OF_MEMORY;
+ }
+ }
+
+ pDest->info.rows += pSrc->info.rows;
+
+ return TSDB_CODE_SUCCESS;
+}
+
+static SSDataBlock* doSort(void* param, bool* newgroup) {
+ SOperatorInfo* pOperator = (SOperatorInfo*) param;
+ if (pOperator->status == OP_EXEC_DONE) {
+ return NULL;
+ }
+
+ SOrderOperatorInfo* pInfo = pOperator->info;
+
+ SSDataBlock* pBlock = NULL;
+ while(1) {
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
+ pBlock = pOperator->upstream[0]->exec(pOperator->upstream[0], newgroup);
+ publishOperatorProfEvent(pOperator->upstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
+
+ // start to flush data into disk and try do multiway merge sort
+ if (pBlock == NULL) {
+ setQueryStatus(pOperator->pRuntimeEnv, QUERY_COMPLETED);
+ pOperator->status = OP_EXEC_DONE;
+ break;
+ }
+
+ int32_t code = doMergeSDatablock(pInfo->pDataBlock, pBlock);
+ if (code != TSDB_CODE_SUCCESS) {
+ // todo handle error
+ }
+ }
+
+ int32_t numOfCols = pInfo->pDataBlock->info.numOfCols;
+ void** pCols = calloc(numOfCols, POINTER_BYTES);
+ SSchema* pSchema = calloc(numOfCols, sizeof(SSchema));
+
+ for(int32_t i = 0; i < numOfCols; ++i) {
+ SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i);
+ pCols[i] = p1->pData;
+ pSchema[i].colId = p1->info.colId;
+ pSchema[i].bytes = p1->info.bytes;
+ pSchema[i].type = (uint8_t) p1->info.type;
+ }
+
+ __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order);
+ taoscQSort(pCols, pSchema, numOfCols, pInfo->pDataBlock->info.rows, pInfo->colIndex, comp);
+
+ tfree(pCols);
+ tfree(pSchema);
+ return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL;
+}
+
+SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) {
+ SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo));
+
+ {
+ SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock));
+ pDataBlock->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData));
+ for(int32_t i = 0; i < numOfOutput; ++i) {
+ SColumnInfoData col = {{0}};
+ col.info.colId = pExpr[i].base.colInfo.colId;
+ col.info.bytes = pExpr[i].base.colBytes;
+ col.info.type = pExpr[i].base.colType;
+ taosArrayPush(pDataBlock->pDataBlock, &col);
+
+ if (col.info.colId == pOrderVal->orderColId) {
+ pInfo->colIndex = i;
+ }
+ }
+
+ pDataBlock->info.numOfCols = numOfOutput;
+ pInfo->order = pOrderVal->order;
+ pInfo->pDataBlock = pDataBlock;
+ }
+
+ SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
+ pOperator->name = "InMemoryOrder";
+ pOperator->operatorType = OP_Order;
+ pOperator->blockingOptr = true;
+ pOperator->status = OP_IN_EXECUTING;
+ pOperator->info = pInfo;
+ pOperator->exec = doSort;
+ pOperator->cleanup = destroyOrderOperatorInfo;
+ pOperator->pRuntimeEnv = pRuntimeEnv;
+
+ appendUpstream(pOperator, upstream);
+ return pOperator;
+}
+
static int32_t getTableScanOrder(STableScanInfo* pTableScanInfo) {
return pTableScanInfo->order;
}
@@ -5838,11 +5948,15 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
if (pOperator->status == OP_RES_TO_RETURN) {
+ int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
+ SQInfo* pQInfo = pRuntimeEnv->qinfo;
+ pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
+
return pIntervalInfo->pRes;
}
@@ -5930,17 +6044,18 @@ static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
doCloseAllTimeWindow(pRuntimeEnv);
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
+ int64_t st = taosGetTimestampUs();
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
pOperator->status = OP_EXEC_DONE;
}
+ SQInfo* pQInfo = pRuntimeEnv->qinfo;
+ pQInfo->summary.firstStageMergeTime += (taosGetTimestampUs() - st);
+
return pIntervalInfo->pRes;
}
-
-
-
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
STableQueryInfo* item = pRuntimeEnv->current;
@@ -6073,6 +6188,7 @@ static SSDataBlock* doStateWindowAgg(void *param, bool* newgroup) {
return pBInfo->pRes->info.rows == 0? NULL:pBInfo->pRes;
}
+
static SSDataBlock* doSessionWindowAgg(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) {
@@ -6399,6 +6515,11 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) {
pInfo->pRes = destroyOutputBuf(pInfo->pRes);
}
+static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) {
+ SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param;
+ pInfo->pDataBlock = destroyOutputBuf(pInfo->pDataBlock);
+}
+
static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) {
SFilterOperatorInfo* pInfo = (SFilterOperatorInfo*) param;
doDestroyFilterInfo(pInfo->pFilterInfo, pInfo->numOfFilterCols);
@@ -6747,7 +6868,6 @@ SOperatorInfo* createFillOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorIn
pOperator->numOfOutput = numOfOutput;
pOperator->info = pInfo;
pOperator->pRuntimeEnv = pRuntimeEnv;
-
pOperator->exec = doFill;
pOperator->cleanup = destroySFillOperatorInfo;
@@ -6956,11 +7076,9 @@ static SSDataBlock* hashDistinct(void* param, bool* newgroup) {
return NULL;
}
-
SDistinctOperatorInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
-
pRes->info.rows = 0;
SSDataBlock* pBlock = NULL;
while(1) {
diff --git a/src/query/src/qExtbuffer.c b/src/query/src/qExtbuffer.c
index cc47cc824b..9f9347b327 100644
--- a/src/query/src/qExtbuffer.c
+++ b/src/query/src/qExtbuffer.c
@@ -12,7 +12,6 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include "qExtbuffer.h"
#include "os.h"
#include "qAggMain.h"
#include "queryLog.h"
@@ -21,6 +20,8 @@
#include "taosmsg.h"
#include "tulog.h"
#include "qExecutor.h"
+#include "qExtbuffer.h"
+#include "tcompare.h"
#define COLMODEL_GET_VAL(data, schema, allrow, rowId, colId) \
(data + (schema)->pFields[colId].offset * (allrow) + (rowId) * (schema)->pFields[colId].field.bytes)
@@ -767,6 +768,60 @@ void tColDataQSort(tOrderDescriptor *pDescriptor, int32_t numOfRows, int32_t sta
free(buf);
}
+void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn) {
+ assert(numOfRows > 0 && numOfCols > 0 && index >= 0 && index < numOfCols);
+
+ int32_t bytes = pSchema[index].bytes;
+ int32_t size = bytes + sizeof(int32_t);
+
+ char* buf = calloc(1, size * numOfRows);
+
+ for(int32_t i = 0; i < numOfRows; ++i) {
+ char* dest = buf + size * i;
+ memcpy(dest, ((char*)pCols[index]) + bytes * i, bytes);
+ *(int32_t*)(dest+bytes) = i;
+ }
+
+ qsort(buf, numOfRows, size, compareFn);
+
+ int32_t prevLength = 0;
+ char* p = NULL;
+
+ for(int32_t i = 0; i < numOfCols; ++i) {
+ int32_t bytes1 = pSchema[i].bytes;
+
+ if (i == index) {
+ for(int32_t j = 0; j < numOfRows; ++j){
+ char* src = buf + (j * size);
+ char* dest = (char*) pCols[i] + (j * bytes1);
+ memcpy(dest, src, bytes1);
+ }
+ } else {
+ // make sure memory buffer is enough
+ if (prevLength < bytes1) {
+ char *tmp = realloc(p, bytes1 * numOfRows);
+ assert(tmp);
+
+ p = tmp;
+ prevLength = bytes1;
+ }
+
+ memcpy(p, pCols[i], bytes1 * numOfRows);
+
+ for(int32_t j = 0; j < numOfRows; ++j){
+ char* dest = (char*) pCols[i] + bytes1 * j;
+
+ int32_t newPos = *(int32_t*)(buf + (j * size) + bytes);
+ char* src = p + (newPos * bytes1);
+ memcpy(dest, src, bytes1);
+ }
+ }
+ }
+
+ tfree(buf);
+ tfree(p);
+}
+
/*
* deep copy of sschema
*/
diff --git a/src/query/src/qPercentile.c b/src/query/src/qPercentile.c
index e3326cc26b..e9022db503 100644
--- a/src/query/src/qPercentile.c
+++ b/src/query/src/qPercentile.c
@@ -237,7 +237,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
}
pBucket->elemPerPage = (pBucket->bufPageSize - sizeof(tFilePage))/pBucket->bytes;
- pBucket->comparFn = getKeyComparFunc(pBucket->type);
+ pBucket->comparFn = getKeyComparFunc(pBucket->type, TSDB_ORDER_ASC);
pBucket->hashFunc = getHashFunc(pBucket->type);
if (pBucket->hashFunc == NULL) {
diff --git a/src/query/src/qPlan.c b/src/query/src/qPlan.c
index b8a5ee7699..f72f70c911 100644
--- a/src/query/src/qPlan.c
+++ b/src/query/src/qPlan.c
@@ -557,10 +557,9 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
int32_t op = 0;
if (onlyQueryTags(pQueryAttr)) { // do nothing for tags query
- if (onlyQueryTags(pQueryAttr)) {
- op = OP_TagScan;
- taosArrayPush(plan, &op);
- }
+ op = OP_TagScan;
+ taosArrayPush(plan, &op);
+
if (pQueryAttr->distinct) {
op = OP_Distinct;
taosArrayPush(plan, &op);
@@ -651,8 +650,14 @@ SArray* createExecOperatorPlan(SQueryAttr* pQueryAttr) {
taosArrayPush(plan, &op);
}
}
+
+ // outer query order by support
+ int32_t orderColId = pQueryAttr->order.orderColId;
+ if (pQueryAttr->vgId == 0 && orderColId != PRIMARYKEY_TIMESTAMP_COL_INDEX && orderColId != INT32_MIN) {
+ op = OP_Order;
+ taosArrayPush(plan, &op);
+ }
}
-
if (pQueryAttr->limit.limit > 0 || pQueryAttr->limit.offset > 0) {
op = OP_Limit;
diff --git a/src/query/src/queryMain.c b/src/query/src/queryMain.c
index 403b51426f..1a9c057ef0 100644
--- a/src/query/src/queryMain.c
+++ b/src/query/src/queryMain.c
@@ -215,7 +215,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, qi
return code;
}
-
bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
SQInfo *pQInfo = (SQInfo *)qinfo;
assert(pQInfo && pQInfo->signature == pQInfo);
@@ -256,7 +255,11 @@ bool qTableQuery(qinfo_t qinfo, uint64_t *qId) {
bool newgroup = false;
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
+
+ int64_t st = taosGetTimestampUs();
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot, &newgroup);
+ pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
+
publishOperatorProfEvent(pRuntimeEnv->proot, QUERY_PROF_AFTER_OPERATOR_EXEC);
pRuntimeEnv->resultInfo.total += GET_NUM_OF_RESULTS(pRuntimeEnv);
diff --git a/src/query/tests/cSortTest.cpp b/src/query/tests/cSortTest.cpp
new file mode 100644
index 0000000000..aa5aa89afc
--- /dev/null
+++ b/src/query/tests/cSortTest.cpp
@@ -0,0 +1,124 @@
+#include
+#include
+
+#include "taos.h"
+#include "tsdb.h"
+#include "qExtbuffer.h"
+
+#pragma GCC diagnostic ignored "-Wwrite-strings"
+#pragma GCC diagnostic ignored "-Wunused-function"
+#pragma GCC diagnostic ignored "-Wunused-variable"
+#pragma GCC diagnostic ignored "-Wsign-compare"
+
+namespace {
+ int32_t comp(const void* p1, const void* p2) {
+ int32_t* x1 = (int32_t*) p1;
+ int32_t* x2 = (int32_t*) p2;
+
+ if (*x1 == *x2) {
+ return 0;
+ } else {
+ return (*x1 > *x2)? 1:-1;
+ }
+ }
+
+ int32_t comp1(const void* p1, const void* p2) {
+ int32_t ret = strncmp((char*) p1, (char*) p2, 20);
+
+ if (ret == 0) {
+ return 0;
+ } else {
+ return ret > 0 ? 1:-1;
+ }
+ }
+}
+
+TEST(testCase, colunmnwise_sort_test) {
+ // void taoscQSort(void** pCols, SSchema* pSchema, int32_t numOfCols, int32_t numOfRows, int32_t index, __compar_fn_t compareFn)
+ void* pCols[2] = {0};
+
+ SSchema s[2] = {{0}};
+ s[0].type = TSDB_DATA_TYPE_INT;
+ s[0].bytes = 4;
+ s[0].colId = 0;
+ strcpy(s[0].name, "col1");
+
+ s[1].type = TSDB_DATA_TYPE_BINARY;
+ s[1].bytes = 20;
+ s[1].colId = 1;
+ strcpy(s[1].name, "col2");
+
+ int32_t* p = (int32_t*) calloc(5, sizeof(int32_t));
+ p[0] = 12;
+ p[1] = 8;
+ p[2] = 99;
+ p[3] = 7;
+ p[4] = 1;
+
+ char* t1 = (char*) calloc(5, 20);
+ strcpy(t1, "abc");
+ strcpy(t1 + 20, "def");
+ strcpy(t1 + 40, "xyz");
+ strcpy(t1 + 60, "klm");
+ strcpy(t1 + 80, "hij");
+
+ pCols[0] = (char*) p;
+ pCols[1] = (char*) t1;
+ taoscQSort(reinterpret_cast(pCols), s, 2, 5, 0, comp);
+
+ int32_t* px = (int32_t*) pCols[0];
+ ASSERT_EQ(px[0], 1);
+ ASSERT_EQ(px[1], 7);
+ ASSERT_EQ(px[2], 8);
+ ASSERT_EQ(px[3], 12);
+ ASSERT_EQ(px[4], 99);
+
+ char* px1 = (char*) pCols[1];
+ ASSERT_STRCASEEQ(px1 + 20 * 0, "hij");
+ ASSERT_STRCASEEQ(px1 + 20 * 1, "klm");
+ ASSERT_STRCASEEQ(px1 + 20 * 2, "def");
+ ASSERT_STRCASEEQ(px1 + 20 * 3, "abc");
+ ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
+
+ taoscQSort(pCols, s, 2, 5, 1, comp1);
+ px = (int32_t*) pCols[0];
+ ASSERT_EQ(px[0], 12);
+ ASSERT_EQ(px[1], 8);
+ ASSERT_EQ(px[2], 1);
+ ASSERT_EQ(px[3], 7);
+ ASSERT_EQ(px[4], 99);
+
+ px1 = (char*) pCols[1];
+ ASSERT_STRCASEEQ(px1 + 20 * 0, "abc");
+ ASSERT_STRCASEEQ(px1 + 20 * 1, "def");
+ ASSERT_STRCASEEQ(px1 + 20 * 2, "hij");
+ ASSERT_STRCASEEQ(px1 + 20 * 3, "klm");
+ ASSERT_STRCASEEQ(px1 + 20 * 4, "xyz");
+}
+
+TEST(testCase, columnsort_test) {
+ SSchema field[1] = {
+ {TSDB_DATA_TYPE_INT, "k", sizeof(int32_t)},
+ };
+
+ const int32_t num = 2000;
+
+ int32_t *d = (int32_t *)malloc(sizeof(int32_t) * num);
+ for (int32_t i = 0; i < num; ++i) {
+ d[i] = i % 4;
+ }
+
+ const int32_t numOfOrderCols = 1;
+ int32_t orderColIdx = 0;
+ SColumnModel *pModel = createColumnModel(field, 1, 1000);
+ tOrderDescriptor *pDesc = tOrderDesCreate(&orderColIdx, numOfOrderCols, pModel, 1);
+
+ tColDataQSort(pDesc, num, 0, num - 1, (char *)d, 1);
+
+ for (int32_t i = 0; i < num; ++i) {
+ printf("%d\t", d[i]);
+ }
+ printf("\n");
+
+ destroyColumnModel(pModel);
+}
\ No newline at end of file
diff --git a/src/query/tests/unitTest.cpp b/src/query/tests/unitTest.cpp
index 9f6e219c0a..1ed4cde406 100644
--- a/src/query/tests/unitTest.cpp
+++ b/src/query/tests/unitTest.cpp
@@ -1,6 +1,4 @@
-#include "os.h"
#include
-#include
#include
#include "taos.h"
diff --git a/src/util/inc/tcompare.h b/src/util/inc/tcompare.h
index cf61b7165a..d1760ab28c 100644
--- a/src/util/inc/tcompare.h
+++ b/src/util/inc/tcompare.h
@@ -47,7 +47,7 @@ int WCSPatternMatch(const wchar_t *pattern, const wchar_t *str, size_t size, con
int32_t doCompare(const char* a, const char* b, int32_t type, size_t size);
-__compar_fn_t getKeyComparFunc(int32_t keyType);
+__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order);
__compar_fn_t getComparFunc(int32_t type, int32_t optr);
diff --git a/src/util/src/tcompare.c b/src/util/src/tcompare.c
index a3c01d2be7..08ed149f04 100644
--- a/src/util/src/tcompare.c
+++ b/src/util/src/tcompare.c
@@ -16,44 +16,22 @@
#include "os.h"
#include "ttype.h"
#include "tcompare.h"
-#include "tarray.h"
#include "hash.h"
int32_t setCompareBytes1(const void *pLeft, const void *pRight) {
- return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
+ return NULL != taosHashGet((SHashObj *)pRight, pLeft, 1) ? 1 : 0;
}
int32_t setCompareBytes2(const void *pLeft, const void *pRight) {
- return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0;
+ return NULL != taosHashGet((SHashObj *)pRight, pLeft, 2) ? 1 : 0;
}
int32_t setCompareBytes4(const void *pLeft, const void *pRight) {
- return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0;
+ return NULL != taosHashGet((SHashObj *)pRight, pLeft, 4) ? 1 : 0;
}
int32_t setCompareBytes8(const void *pLeft, const void *pRight) {
- return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
-}
-
-int32_t compareInt32Val(const void *pLeft, const void *pRight) {
- int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
- return 0;
-}
-
-int32_t compareInt64Val(const void *pLeft, const void *pRight) {
- int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
- return 0;
-}
-
-int32_t compareInt16Val(const void *pLeft, const void *pRight) {
- int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
- if (left > right) return 1;
- if (left < right) return -1;
- return 0;
+ return NULL != taosHashGet((SHashObj *)pRight, pLeft, 8) ? 1 : 0;
}
int32_t compareInt8Val(const void *pLeft, const void *pRight) {
@@ -63,6 +41,43 @@ int32_t compareInt8Val(const void *pLeft, const void *pRight) {
return 0;
}
+int32_t compareInt8ValDesc(const void *pLeft, const void *pRight) {
+ return compareInt8Val(pRight, pLeft);
+}
+
+int32_t compareInt16Val(const void *pLeft, const void *pRight) {
+ int16_t left = GET_INT16_VAL(pLeft), right = GET_INT16_VAL(pRight);
+ if (left > right) return 1;
+ if (left < right) return -1;
+ return 0;
+}
+
+int32_t compareInt16ValDesc(const void* pLeft, const void* pRight) {
+ return compareInt16Val(pRight, pLeft);
+}
+
+int32_t compareInt32Val(const void *pLeft, const void *pRight) {
+ int32_t left = GET_INT32_VAL(pLeft), right = GET_INT32_VAL(pRight);
+ if (left > right) return 1;
+ if (left < right) return -1;
+ return 0;
+}
+
+int32_t compareInt32ValDesc(const void* pLeft, const void* pRight) {
+ return compareInt32Val(pRight, pLeft);
+}
+
+int32_t compareInt64Val(const void *pLeft, const void *pRight) {
+ int64_t left = GET_INT64_VAL(pLeft), right = GET_INT64_VAL(pRight);
+ if (left > right) return 1;
+ if (left < right) return -1;
+ return 0;
+}
+
+int32_t compareInt64ValDesc(const void* pLeft, const void* pRight) {
+ return compareInt64Val(pRight, pLeft);
+}
+
int32_t compareUint32Val(const void *pLeft, const void *pRight) {
uint32_t left = GET_UINT32_VAL(pLeft), right = GET_UINT32_VAL(pRight);
if (left > right) return 1;
@@ -70,6 +85,10 @@ int32_t compareUint32Val(const void *pLeft, const void *pRight) {
return 0;
}
+int32_t compareUint32ValDesc(const void* pLeft, const void* pRight) {
+ return compareUint32Val(pRight, pLeft);
+}
+
int32_t compareUint64Val(const void *pLeft, const void *pRight) {
uint64_t left = GET_UINT64_VAL(pLeft), right = GET_UINT64_VAL(pRight);
if (left > right) return 1;
@@ -77,6 +96,10 @@ int32_t compareUint64Val(const void *pLeft, const void *pRight) {
return 0;
}
+int32_t compareUint64ValDesc(const void* pLeft, const void* pRight) {
+ return compareUint64Val(pRight, pLeft);
+}
+
int32_t compareUint16Val(const void *pLeft, const void *pRight) {
uint16_t left = GET_UINT16_VAL(pLeft), right = GET_UINT16_VAL(pRight);
if (left > right) return 1;
@@ -84,6 +107,10 @@ int32_t compareUint16Val(const void *pLeft, const void *pRight) {
return 0;
}
+int32_t compareUint16ValDesc(const void* pLeft, const void* pRight) {
+ return compareUint16Val(pRight, pLeft);
+}
+
int32_t compareUint8Val(const void* pLeft, const void* pRight) {
uint8_t left = GET_UINT8_VAL(pLeft), right = GET_UINT8_VAL(pRight);
if (left > right) return 1;
@@ -91,6 +118,10 @@ int32_t compareUint8Val(const void* pLeft, const void* pRight) {
return 0;
}
+int32_t compareUint8ValDesc(const void* pLeft, const void* pRight) {
+ return compareUint8Val(pRight, pLeft);
+}
+
int32_t compareFloatVal(const void *pLeft, const void *pRight) {
float p1 = GET_FLOAT_VAL(pLeft);
float p2 = GET_FLOAT_VAL(pRight);
@@ -112,6 +143,10 @@ int32_t compareFloatVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
+int32_t compareFloatValDesc(const void* pLeft, const void* pRight) {
+ return compareFloatVal(pRight, pLeft);
+}
+
int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
double p1 = GET_DOUBLE_VAL(pLeft);
double p2 = GET_DOUBLE_VAL(pRight);
@@ -133,6 +168,10 @@ int32_t compareDoubleVal(const void *pLeft, const void *pRight) {
return FLT_GREATER(p1, p2) ? 1: -1;
}
+int32_t compareDoubleValDesc(const void* pLeft, const void* pRight) {
+ return compareDoubleVal(pRight, pLeft);
+}
+
int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
@@ -149,6 +188,10 @@ int32_t compareLenPrefixedStr(const void *pLeft, const void *pRight) {
}
}
+int32_t compareLenPrefixedStrDesc(const void* pLeft, const void* pRight) {
+ return compareLenPrefixedStr(pRight, pLeft);
+}
+
int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
int32_t len1 = varDataLen(pLeft);
int32_t len2 = varDataLen(pRight);
@@ -165,6 +208,10 @@ int32_t compareLenPrefixedWStr(const void *pLeft, const void *pRight) {
}
}
+int32_t compareLenPrefixedWStrDesc(const void* pLeft, const void* pRight) {
+ return compareLenPrefixedWStr(pRight, pLeft);
+}
+
/*
* Compare two strings
* TSDB_MATCH: Match
@@ -303,10 +350,6 @@ int32_t taosArrayCompareString(const void* a, const void* b) {
return compareLenPrefixedStr(x, y);
}
-//static int32_t compareFindStrInArray(const void* pLeft, const void* pRight) {
-// const SArray* arr = (const SArray*) pRight;
-// return taosArraySearchString(arr, pLeft, taosArrayCompareString, TD_EQ) == NULL ? 0 : 1;
-//}
int32_t compareFindItemInSet(const void *pLeft, const void* pRight) {
return NULL != taosHashGet((SHashObj *)pRight, varDataVal(pLeft), varDataLen(pLeft)) ? 1 : 0;
}
@@ -330,26 +373,26 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
if (optr == TSDB_RELATION_IN && (type != TSDB_DATA_TYPE_BINARY && type != TSDB_DATA_TYPE_NCHAR)) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
- case TSDB_DATA_TYPE_TINYINT:
- case TSDB_DATA_TYPE_UTINYINT:
+ case TSDB_DATA_TYPE_TINYINT:
+ case TSDB_DATA_TYPE_UTINYINT:
return setCompareBytes1;
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT:
return setCompareBytes2;
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT:
- case TSDB_DATA_TYPE_FLOAT:
+ case TSDB_DATA_TYPE_FLOAT:
return setCompareBytes4;
- case TSDB_DATA_TYPE_BIGINT:
- case TSDB_DATA_TYPE_UBIGINT:
- case TSDB_DATA_TYPE_DOUBLE:
- case TSDB_DATA_TYPE_TIMESTAMP:
+ case TSDB_DATA_TYPE_BIGINT:
+ case TSDB_DATA_TYPE_UBIGINT:
+ case TSDB_DATA_TYPE_DOUBLE:
+ case TSDB_DATA_TYPE_TIMESTAMP:
return setCompareBytes8;
default:
assert(0);
}
}
-
+
switch (type) {
case TSDB_DATA_TYPE_BOOL:
case TSDB_DATA_TYPE_TINYINT: comparFn = compareInt8Val; break;
@@ -395,50 +438,50 @@ __compar_fn_t getComparFunc(int32_t type, int32_t optr) {
return comparFn;
}
-__compar_fn_t getKeyComparFunc(int32_t keyType) {
+__compar_fn_t getKeyComparFunc(int32_t keyType, int32_t order) {
__compar_fn_t comparFn = NULL;
switch (keyType) {
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_BOOL:
- comparFn = compareInt8Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareInt8Val:compareInt8ValDesc;
break;
case TSDB_DATA_TYPE_SMALLINT:
- comparFn = compareInt16Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareInt16Val:compareInt16ValDesc;
break;
case TSDB_DATA_TYPE_INT:
- comparFn = compareInt32Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_TIMESTAMP:
- comparFn = compareInt64Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareInt64Val:compareInt64ValDesc;
break;
case TSDB_DATA_TYPE_FLOAT:
- comparFn = compareFloatVal;
+ comparFn = (order == TSDB_ORDER_ASC)? compareFloatVal:compareFloatValDesc;
break;
case TSDB_DATA_TYPE_DOUBLE:
- comparFn = compareDoubleVal;
+ comparFn = (order == TSDB_ORDER_ASC)? compareDoubleVal:compareDoubleValDesc;
break;
case TSDB_DATA_TYPE_UTINYINT:
- comparFn = compareUint8Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareUint8Val:compareUint8ValDesc;
break;
case TSDB_DATA_TYPE_USMALLINT:
- comparFn = compareUint16Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareUint16Val:compareUint16ValDesc;
break;
case TSDB_DATA_TYPE_UINT:
- comparFn = compareUint32Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareUint32Val:compareUint32ValDesc;
break;
case TSDB_DATA_TYPE_UBIGINT:
- comparFn = compareUint64Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareUint64Val:compareUint64ValDesc;
break;
case TSDB_DATA_TYPE_BINARY:
- comparFn = compareLenPrefixedStr;
+ comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedStr:compareLenPrefixedStrDesc;
break;
case TSDB_DATA_TYPE_NCHAR:
- comparFn = compareLenPrefixedWStr;
+ comparFn = (order == TSDB_ORDER_ASC)? compareLenPrefixedWStr:compareLenPrefixedWStrDesc;
break;
default:
- comparFn = compareInt32Val;
+ comparFn = (order == TSDB_ORDER_ASC)? compareInt32Val:compareInt32ValDesc;
break;
}
diff --git a/src/util/src/tskiplist.c b/src/util/src/tskiplist.c
index b464519ba6..98fd9c094c 100644
--- a/src/util/src/tskiplist.c
+++ b/src/util/src/tskiplist.c
@@ -54,7 +54,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
pSkipList->keyFn = fn;
pSkipList->seed = rand();
if (comparFn == NULL) {
- pSkipList->comparFn = getKeyComparFunc(keyType);
+ pSkipList->comparFn = getKeyComparFunc(keyType, TSDB_ORDER_ASC);
} else {
pSkipList->comparFn = comparFn;
}
diff --git a/tests/pytest/query/long_where_query.py b/tests/pytest/query/long_where_query.py
index 62e9533b62..9bb5f0b3d7 100644
--- a/tests/pytest/query/long_where_query.py
+++ b/tests/pytest/query/long_where_query.py
@@ -287,13 +287,9 @@ class TDTestCase:
tdLog.info(len(sql))
tdSql.error(sql)
-
endTime = time.time()
print("total time %ds" % (endTime - startTime))
-
-
-
- os.system("rm -rf query/long_where_query.py.sql")
+ #os.system("rm -rf query/long_where_query.py.sql")
def stop(self):
diff --git a/tests/script/unique/dnode/alternativeRole.sim b/tests/script/unique/dnode/alternativeRole.sim
index 955b757f06..14a6e92f06 100644
--- a/tests/script/unique/dnode/alternativeRole.sim
+++ b/tests/script/unique/dnode/alternativeRole.sim
@@ -30,7 +30,7 @@ sql create dnode $hostname2
system sh/exec.sh -n dnode2 -s start
sql create dnode $hostname3
system sh/exec.sh -n dnode3 -s start
-sleep 3000
+sleep 5000
sql show dnodes
print dnode1 $data5_1