[td-225] fix bugs in group by normal columns
This commit is contained in:
parent
df41013eca
commit
f87322f680
|
@ -699,7 +699,7 @@ static int32_t first_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end,
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
|
static int32_t last_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
|
||||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||||
return BLK_DATA_NO_NEEDED;
|
return BLK_DATA_NO_NEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -727,7 +727,7 @@ static int32_t first_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
|
static int32_t last_dist_data_req_info(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end, int32_t colId) {
|
||||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||||
return BLK_DATA_NO_NEEDED;
|
return BLK_DATA_NO_NEEDED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1593,7 +1593,7 @@ static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCtx->order == TSDB_ORDER_DESC) {
|
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1652,7 +1652,7 @@ static void first_dist_func_second_merge(SQLFunctionCtx *pCtx) {
|
||||||
* least one data in this block that is not null.(TODO opt for this case)
|
* least one data in this block that is not null.(TODO opt for this case)
|
||||||
*/
|
*/
|
||||||
static void last_function(SQLFunctionCtx *pCtx) {
|
static void last_function(SQLFunctionCtx *pCtx) {
|
||||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1681,7 +1681,6 @@ static void last_function(SQLFunctionCtx *pCtx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
static void last_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
assert(pCtx->order != TSDB_ORDER_ASC);
|
|
||||||
void *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
|
void *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||||
return;
|
return;
|
||||||
|
@ -1725,7 +1724,7 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
|
||||||
* 1. for scan data in asc order, no need to check data
|
* 1. for scan data in asc order, no need to check data
|
||||||
* 2. for data blocks that are not loaded, no need to check data
|
* 2. for data blocks that are not loaded, no need to check data
|
||||||
*/
|
*/
|
||||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1763,7 +1762,7 @@ static void last_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||||
* 1. for scan data in asc order, no need to check data
|
* 1. for scan data in asc order, no need to check data
|
||||||
* 2. for data blocks that are not loaded, no need to check data
|
* 2. for data blocks that are not loaded, no need to check data
|
||||||
*/
|
*/
|
||||||
if (pCtx->order == TSDB_ORDER_ASC) {
|
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1452,6 +1452,13 @@ static int32_t setExprInfoForFunctions(SQueryInfo* pQueryInfo, SSchema* pSchema,
|
||||||
|
|
||||||
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
|
SSqlExpr* pExpr = tscSqlExprAppend(pQueryInfo, functionID, pColIndex, type, bytes, bytes, false);
|
||||||
tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName));
|
tstrncpy(pExpr->aliasName, columnName, sizeof(pExpr->aliasName));
|
||||||
|
|
||||||
|
// set reverse order scan data blocks for last query
|
||||||
|
if (functionID == TSDB_FUNC_LAST) {
|
||||||
|
pExpr->numOfParams = 1;
|
||||||
|
pExpr->param[0].i64Key = TSDB_ORDER_DESC;
|
||||||
|
pExpr->param[0].nType = TSDB_DATA_TYPE_INT;
|
||||||
|
}
|
||||||
|
|
||||||
// for all queries, the timestamp column needs to be loaded
|
// for all queries, the timestamp column needs to be loaded
|
||||||
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
SColumnIndex index = {.tableIndex = pColIndex->tableIndex, .columnIndex = PRIMARYKEY_TIMESTAMP_COL_INDEX};
|
||||||
|
@ -1724,6 +1731,22 @@ int32_t addExprAndResultField(SQueryInfo* pQueryInfo, int32_t colIndex, tSQLExpr
|
||||||
if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIndex + i, &index) != 0) {
|
if (setExprInfoForFunctions(pQueryInfo, pSchema, functionID, pItem->aliasName, colIndex + i, &index) != 0) {
|
||||||
return TSDB_CODE_TSC_INVALID_SQL;
|
return TSDB_CODE_TSC_INVALID_SQL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (optr == TK_LAST) { // todo refactor
|
||||||
|
SSqlGroupbyExpr* pGroupBy = &pQueryInfo->groupbyExpr;
|
||||||
|
if (pGroupBy->numOfGroupCols > 0) {
|
||||||
|
for(int32_t k = 0; k < pGroupBy->numOfGroupCols; ++k) {
|
||||||
|
SColIndex* pIndex = taosArrayGet(pGroupBy->columnInfo, k);
|
||||||
|
if (!TSDB_COL_IS_TAG(pIndex->flag) && pIndex->colIndex < tscGetNumOfColumns(pTableMetaInfo->pTableMeta)) { // group by normal columns
|
||||||
|
SSqlExpr* pExpr = taosArrayGetP(pQueryInfo->exprList, colIndex + i);
|
||||||
|
pExpr->numOfParams = 1;
|
||||||
|
pExpr->param->i64Key = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2586,9 +2609,7 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, tVariantList* pList, SSqlCmd*
|
||||||
|
|
||||||
tscColumnListInsert(pQueryInfo->colList, &index);
|
tscColumnListInsert(pQueryInfo->colList, &index);
|
||||||
|
|
||||||
SColIndex colIndex = {
|
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
|
||||||
.colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId,
|
|
||||||
};
|
|
||||||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||||
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
|
pQueryInfo->groupbyExpr.orderType = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
|
|
|
@ -430,7 +430,7 @@ void tscKillSTableQuery(SSqlObj *pSql) {
|
||||||
/*
|
/*
|
||||||
* 1. if the subqueries are not launched or partially launched, we need to waiting the launched
|
* 1. if the subqueries are not launched or partially launched, we need to waiting the launched
|
||||||
* query return to successfully free allocated resources.
|
* query return to successfully free allocated resources.
|
||||||
* 2. if no any subqueries are launched yet, which means the metric query only in parse sql stage,
|
* 2. if no any subqueries are launched yet, which means the super table query only in parse sql stage,
|
||||||
* set the res.code, and return.
|
* set the res.code, and return.
|
||||||
*/
|
*/
|
||||||
const int64_t MAX_WAITING_TIME = 10000; // 10 Sec.
|
const int64_t MAX_WAITING_TIME = 10000; // 10 Sec.
|
||||||
|
@ -2200,7 +2200,7 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
|
||||||
* The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
|
* The cached information is expired, however, we may have lost the ref of original meter. So, clear whole cache
|
||||||
* instead.
|
* instead.
|
||||||
*/
|
*/
|
||||||
tscTrace("%p force release metermeta after drop table:%s", pSql, pTableMetaInfo->name);
|
tscTrace("%p force release table meta after drop table:%s", pSql, pTableMetaInfo->name);
|
||||||
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
|
taosCacheRelease(tscCacheHandle, (void **)&pTableMeta, true);
|
||||||
|
|
||||||
if (pTableMetaInfo->pTableMeta) {
|
if (pTableMetaInfo->pTableMeta) {
|
||||||
|
|
|
@ -354,7 +354,7 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
||||||
int16_t bytes) {
|
int16_t bytes) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
int32_t *p1 = (int32_t *)taosHashGet(pWindowResInfo->hashList, pData, bytes);
|
int32_t *p1 = (int32_t *) taosHashGet(pWindowResInfo->hashList, pData, bytes);
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
pWindowResInfo->curIndex = *p1;
|
pWindowResInfo->curIndex = *p1;
|
||||||
} else { // more than the capacity, reallocate the resources
|
} else { // more than the capacity, reallocate the resources
|
||||||
|
@ -919,12 +919,25 @@ static int32_t setGroupResultOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, char *pDat
|
||||||
|
|
||||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||||
|
|
||||||
|
int64_t v = -1;
|
||||||
|
// not assign result buffer yet, add new result buffer
|
||||||
|
switch(type) {
|
||||||
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
case TSDB_DATA_TYPE_TINYINT: v = GET_INT8_VAL(pData); break;
|
||||||
|
case TSDB_DATA_TYPE_SMALLINT: v = GET_INT16_VAL(pData); break;
|
||||||
|
case TSDB_DATA_TYPE_INT: v = GET_INT32_VAL(pData); break;
|
||||||
|
case TSDB_DATA_TYPE_BIGINT: v = GET_INT64_VAL(pData); break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// assert(pRuntimeEnv->windowResInfo.hashList->size <= 2);
|
||||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
|
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, pData, bytes);
|
||||||
if (pWindowRes == NULL) {
|
if (pWindowRes == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// not assign result buffer yet, add new result buffer
|
pWindowRes->window.skey = v;
|
||||||
|
pWindowRes->window.ekey = v;
|
||||||
|
|
||||||
if (pWindowRes->pos.pageId == -1) {
|
if (pWindowRes->pos.pageId == -1) {
|
||||||
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
|
int32_t ret = addNewWindowResultBuf(pWindowRes, pResultBuf, GROUPRESULTID, pRuntimeEnv->numOfRowsPerPage);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -1022,12 +1035,16 @@ static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST) {
|
if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) {
|
||||||
return !QUERY_IS_ASC_QUERY(pQuery);
|
|
||||||
} else if (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_FIRST) {
|
|
||||||
return QUERY_IS_ASC_QUERY(pQuery);
|
return QUERY_IS_ASC_QUERY(pQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo add comments
|
||||||
|
if ((functionId == TSDB_FUNC_LAST_DST || functionId == TSDB_FUNC_LAST)) {
|
||||||
|
return pCtx->param[0].i64Key == pQuery->order.order;
|
||||||
|
// return !QUERY_IS_ASC_QUERY(pQuery);
|
||||||
|
}
|
||||||
|
|
||||||
// in the supplementary scan, only the following functions need to be executed
|
// in the supplementary scan, only the following functions need to be executed
|
||||||
if (IS_REVERSE_SCAN(pRuntimeEnv)) {
|
if (IS_REVERSE_SCAN(pRuntimeEnv)) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -1079,7 +1096,7 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
||||||
|
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
int32_t offset = -1;
|
int32_t offset = -1;
|
||||||
|
|
||||||
for (j = 0; j < pDataBlockInfo->rows; ++j) {
|
for (j = 0; j < pDataBlockInfo->rows; ++j) {
|
||||||
offset = GET_COL_DATA_POS(pQuery, j, step);
|
offset = GET_COL_DATA_POS(pQuery, j, step);
|
||||||
|
|
||||||
|
@ -1478,6 +1495,7 @@ static void teardownQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool isQueryKilled(SQInfo *pQInfo) {
|
static bool isQueryKilled(SQInfo *pQInfo) {
|
||||||
|
return false;
|
||||||
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
|
return (pQInfo->code == TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
#if 0
|
#if 0
|
||||||
/*
|
/*
|
||||||
|
@ -1574,10 +1592,14 @@ static bool needReverseScan(SQuery *pQuery) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (((functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if ((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
((functionId == TSDB_FUNC_FIRST || functionId == TSDB_FUNC_FIRST_DST) && !QUERY_IS_ASC_QUERY(pQuery))) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (functionId == TSDB_FUNC_LAST || functionId == TSDB_FUNC_LAST_DST) {
|
||||||
|
int32_t order = pQuery->pSelectExpr[i].base.arg->argValue.i64;
|
||||||
|
return order != pQuery->order.order;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
|
@ -2030,6 +2052,34 @@ int32_t binarySearchForKey(char *pValue, int num, TSKEY key, int order) {
|
||||||
return midPos;
|
return midPos;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void ensureOutputBufferSimple(SQueryRuntimeEnv* pRuntimeEnv, int32_t capacity) {
|
||||||
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
|
if (capacity < pQuery->rec.capacity) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
|
int32_t bytes = pQuery->pSelectExpr[i].bytes;
|
||||||
|
assert(bytes > 0 && capacity > 0);
|
||||||
|
|
||||||
|
char *tmp = realloc(pQuery->sdata[i], bytes * capacity + sizeof(tFilePage));
|
||||||
|
if (tmp == NULL) { // todo handle the oom
|
||||||
|
assert(0);
|
||||||
|
} else {
|
||||||
|
pQuery->sdata[i] = (tFilePage *)tmp;
|
||||||
|
}
|
||||||
|
|
||||||
|
// set the pCtx output buffer position
|
||||||
|
pRuntimeEnv->pCtx[i].aOutputBuf = pQuery->sdata[i]->data;
|
||||||
|
}
|
||||||
|
|
||||||
|
qTrace("QInfo:%p realloc output buffer to inc output buffer from: %d rows to:%d rows", GET_QINFO_ADDR(pRuntimeEnv),
|
||||||
|
pQuery->rec.capacity, capacity);
|
||||||
|
|
||||||
|
pQuery->rec.capacity = capacity;
|
||||||
|
}
|
||||||
|
|
||||||
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
|
static void ensureOutputBuffer(SQueryRuntimeEnv* pRuntimeEnv, SDataBlockInfo* pBlockInfo) {
|
||||||
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
// in case of prj/diff query, ensure the output buffer is sufficient to accommodate the results of current block
|
||||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||||
|
@ -2916,8 +2966,7 @@ void skipResults(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
pRuntimeEnv->pCtx[i].ptsOutputBuf = pRuntimeEnv->pCtx[0].aOutputBuf;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
updateNumOfResult(pRuntimeEnv, pQuery->rec.rows);
|
updateNumOfResult(pRuntimeEnv, pQuery->rec.rows);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3054,7 +3103,7 @@ static void clearEnvAfterReverseScan(SQueryRuntimeEnv *pRuntimeEnv, SQueryStatus
|
||||||
pQuery->window = pTableQueryInfo->win;
|
pQuery->window = pTableQueryInfo->win;
|
||||||
}
|
}
|
||||||
|
|
||||||
void scanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
void scanOneTableDataBlocks(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start) {
|
||||||
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo *pQInfo = (SQInfo *) GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
STableQueryInfo *pTableQueryInfo = pQuery->current;
|
||||||
|
@ -3496,18 +3545,32 @@ void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) {
|
||||||
assert(pQuery->rec.rows <= pQuery->rec.capacity);
|
assert(pQuery->rec.rows <= pQuery->rec.capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
static UNUSED_FUNC void updateWindowResNumOfRes(SQueryRuntimeEnv *pRuntimeEnv, STableQueryInfo *pTableQueryInfo) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
// update the number of result for each, only update the number of rows for the corresponding window result.
|
// update the number of result for each, only update the number of rows for the corresponding window result.
|
||||||
if (pQuery->intervalTime == 0) {
|
if (pQuery->intervalTime == 0) {
|
||||||
int32_t g = pTableQueryInfo->groupIndex;
|
|
||||||
assert(pRuntimeEnv->windowResInfo.size > 0);
|
|
||||||
|
|
||||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
|
for (int32_t i = 0; i < pRuntimeEnv->windowResInfo.size; ++i) {
|
||||||
if (pWindowRes->numOfRows == 0) {
|
SWindowResult *pResult = &pRuntimeEnv->windowResInfo.pResult[i];
|
||||||
pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
|
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
int32_t functionId = pRuntimeEnv->pCtx[j].functionId;
|
||||||
|
if (functionId == TSDB_FUNC_TS || functionId == TSDB_FUNC_TAG || functionId == TSDB_FUNC_TAGPRJ) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// int32_t g = pTableQueryInfo->groupIndex;
|
||||||
|
// assert(pRuntimeEnv->windowResInfo.size > 0);
|
||||||
|
//
|
||||||
|
// SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, &pRuntimeEnv->windowResInfo, (char *)&g, sizeof(g));
|
||||||
|
// if (pWindowRes->numOfRows == 0) {
|
||||||
|
// pWindowRes->numOfRows = getNumOfResult(pRuntimeEnv);
|
||||||
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4081,21 +4144,22 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||||
SDataStatis *pStatis = NULL;
|
SDataStatis *pStatis = NULL;
|
||||||
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
SArray *pDataBlock = loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis);
|
||||||
|
|
||||||
if (!isIntervalQuery(pQuery)) {
|
if (!isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||||
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
if (!isIntervalQuery(pQuery)) {
|
||||||
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step);
|
int32_t step = QUERY_IS_ASC_QUERY(pQuery)? 1:-1;
|
||||||
} else { // interval query
|
setExecutionContext(pQInfo, &pTableQueryInfo->id, pTableQueryInfo->groupIndex, blockInfo.window.ekey + step);
|
||||||
TSKEY nextKey = blockInfo.window.skey;
|
} else { // interval query
|
||||||
setIntervalQueryRange(pQInfo, nextKey);
|
TSKEY nextKey = blockInfo.window.skey;
|
||||||
/*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
setIntervalQueryRange(pQInfo, nextKey);
|
||||||
|
/*int32_t ret = */setAdditionalInfo(pQInfo, &pTableQueryInfo->id, pTableQueryInfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
summary->totalRows += blockInfo.rows;
|
summary->totalRows += blockInfo.rows;
|
||||||
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
|
stableApplyFunctionsOnBlock(pRuntimeEnv, &blockInfo, pStatis, pDataBlock, binarySearchForKey);
|
||||||
|
|
||||||
qTrace("QInfo:%p check data block, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, lastKey:%" PRId64,
|
qTrace("QInfo:%p check data block, uid:%"PRId64", tid:%d, brange:%" PRId64 "-%" PRId64 ", numOfRows:%d, lastKey:%" PRId64,
|
||||||
GET_QINFO_ADDR(pRuntimeEnv), blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey,
|
pQInfo, blockInfo.uid, blockInfo.tid, blockInfo.window.skey, blockInfo.window.ekey, blockInfo.rows, pQuery->current->lastKey);
|
||||||
blockInfo.rows, pQuery->current->lastKey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t et = taosGetTimestampMs();
|
int64_t et = taosGetTimestampMs();
|
||||||
|
@ -4220,7 +4284,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
// here we simply set the first table as current table
|
// here we simply set the first table as current table
|
||||||
pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info;
|
pQuery->current = ((SGroupItem*) taosArrayGet(group, 0))->info;
|
||||||
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
|
|
||||||
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
int64_t numOfRes = getNumOfResult(pRuntimeEnv);
|
||||||
if (numOfRes > 0) {
|
if (numOfRes > 0) {
|
||||||
|
@ -4233,10 +4297,84 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
|
|
||||||
// enable execution for next table, when handling the projection query
|
// enable execution for next table, when handling the projection query
|
||||||
enableExecutionForNextTable(pRuntimeEnv);
|
enableExecutionForNextTable(pRuntimeEnv);
|
||||||
|
|
||||||
|
if (pQuery->rec.rows >= pQuery->rec.capacity) {
|
||||||
|
setQueryStatus(pQuery, QUERY_RESBUF_FULL);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { // group-by on normal columns query
|
||||||
|
while (pQInfo->groupIndex < numOfGroups) {
|
||||||
|
SArray* group = taosArrayGetP(pQInfo->groupInfo.pGroupList, pQInfo->groupIndex);
|
||||||
|
|
||||||
|
qTrace("QInfo:%p group by normal columns group:%d, total group:%d", pQInfo, pQInfo->groupIndex, numOfGroups);
|
||||||
|
|
||||||
|
STsdbQueryCond cond = {
|
||||||
|
.twindow = pQuery->window,
|
||||||
|
.colList = pQuery->colList,
|
||||||
|
.order = pQuery->order.order,
|
||||||
|
.numOfCols = pQuery->numOfCols,
|
||||||
|
};
|
||||||
|
|
||||||
|
SArray *g1 = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
SArray *tx = taosArrayClone(group);
|
||||||
|
taosArrayPush(g1, &tx);
|
||||||
|
|
||||||
|
STableGroupInfo gp = {.numOfTables = taosArrayGetSize(tx), .pGroupList = g1};
|
||||||
|
|
||||||
|
// include only current table
|
||||||
|
if (pRuntimeEnv->pQueryHandle != NULL) {
|
||||||
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
|
pRuntimeEnv->pQueryHandle = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp, pQInfo);
|
||||||
|
|
||||||
|
SArray* s = tsdbGetQueriedTableIdList(pRuntimeEnv->pQueryHandle);
|
||||||
|
assert(taosArrayGetSize(s) >= 1);
|
||||||
|
|
||||||
|
setTagVal(pRuntimeEnv, (STableId*) taosArrayGet(s, 0), pQInfo->tsdb);
|
||||||
|
|
||||||
|
// here we simply set the first table as current table
|
||||||
|
scanMultiTableDataBlocks(pQInfo);
|
||||||
|
pQInfo->groupIndex += 1;
|
||||||
|
|
||||||
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
|
// no results generated for current group, continue to try the next group
|
||||||
|
if (pWindowResInfo->size <= 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
|
SWindowStatus *pStatus = &pWindowResInfo->pResult[i].status;
|
||||||
|
pStatus->closed = true; // enable return all results for group by normal columns
|
||||||
|
|
||||||
|
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||||
|
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||||
|
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
qTrace("QInfo:%p generated groupby columns results %d rows for group %d completed", pQInfo, pWindowResInfo->size,
|
||||||
|
pQInfo->groupIndex);
|
||||||
|
int32_t currentGroupIndex = pQInfo->groupIndex;
|
||||||
|
|
||||||
|
pQuery->rec.rows = 0;
|
||||||
|
pQInfo->groupIndex = 0;
|
||||||
|
|
||||||
|
ensureOutputBufferSimple(pRuntimeEnv, pWindowResInfo->size);
|
||||||
|
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
||||||
|
|
||||||
|
pQInfo->groupIndex = currentGroupIndex; //restore the group index
|
||||||
|
assert(pQuery->rec.rows == pWindowResInfo->size);
|
||||||
|
|
||||||
|
clearClosedTimeWindow(pRuntimeEnv);
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
* 1. super table projection query, 2. group-by on normal columns query, 3. ts-comp query
|
* 1. super table projection query, 2. ts-comp query
|
||||||
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
|
* if the subgroup index is larger than 0, results generated by group by tbname,k is existed.
|
||||||
* we need to return it to client in the first place.
|
* we need to return it to client in the first place.
|
||||||
*/
|
*/
|
||||||
|
@ -4283,7 +4421,7 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
skipResults(pRuntimeEnv);
|
skipResults(pRuntimeEnv);
|
||||||
|
|
||||||
// the limitation of output result is reached, set the query completed
|
// the limitation of output result is reached, set the query completed
|
||||||
|
@ -4349,25 +4487,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
||||||
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
|
pRuntimeEnv->cur = pRuntimeEnv->pTSBuf->cur;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo refactor
|
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
|
||||||
SWindowStatus *pStatus = &pWindowResInfo->pResult[i].status;
|
|
||||||
pStatus->closed = true; // enable return all results for group by normal columns
|
|
||||||
|
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
|
||||||
for (int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
|
||||||
pResult->numOfRows = MAX(pResult->numOfRows, pResult->resultInfo[j].numOfRes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pQInfo->groupIndex = 0;
|
|
||||||
pQuery->rec.rows = 0;
|
|
||||||
copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult);
|
|
||||||
}
|
|
||||||
|
|
||||||
qTrace(
|
qTrace(
|
||||||
"QInfo %p numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"PRId64", offset:%" PRId64,
|
"QInfo %p numOfTables:%d, index:%d, numOfGroups:%d, %d points returned, total:%"PRId64", offset:%" PRId64,
|
||||||
pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
|
pQInfo, pQInfo->groupInfo.numOfTables, pQInfo->tableIndex, numOfGroups, pQuery->rec.rows, pQuery->rec.total,
|
||||||
|
@ -4449,7 +4568,6 @@ static void multiTableQueryProcess(SQInfo *pQInfo) {
|
||||||
*/
|
*/
|
||||||
if (isIntervalQuery(pQuery)) {
|
if (isIntervalQuery(pQuery)) {
|
||||||
copyResToQueryResultBuf(pQInfo, pQuery);
|
copyResToQueryResultBuf(pQInfo, pQuery);
|
||||||
|
|
||||||
#ifdef _DEBUG_VIEW
|
#ifdef _DEBUG_VIEW
|
||||||
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
|
displayInterResult(pQuery->sdata, pRuntimeEnv, pQuery->sdata[0]->num);
|
||||||
#endif
|
#endif
|
||||||
|
@ -4527,7 +4645,7 @@ static void tableFixedOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
|
|
||||||
pQuery->current = pTableInfo; // set current query table info
|
pQuery->current = pTableInfo; // set current query table info
|
||||||
|
|
||||||
scanAllDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pTableInfo->lastKey);
|
||||||
finalizeQueryResult(pRuntimeEnv);
|
finalizeQueryResult(pRuntimeEnv);
|
||||||
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
|
@ -4560,7 +4678,7 @@ static void tableMultiOutputProcess(SQInfo *pQInfo, STableQueryInfo* pTableInfo)
|
||||||
}
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
scanAllDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
scanOneTableDataBlocks(pRuntimeEnv, pQuery->current->lastKey);
|
||||||
finalizeQueryResult(pRuntimeEnv);
|
finalizeQueryResult(pRuntimeEnv);
|
||||||
|
|
||||||
if (isQueryKilled(pQInfo)) {
|
if (isQueryKilled(pQInfo)) {
|
||||||
|
@ -4607,7 +4725,7 @@ static void tableIntervalProcessImpl(SQueryRuntimeEnv *pRuntimeEnv, TSKEY start)
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
scanAllDataBlocks(pRuntimeEnv, start);
|
scanOneTableDataBlocks(pRuntimeEnv, start);
|
||||||
|
|
||||||
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
if (isQueryKilled(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -113,7 +113,9 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||||
if (pResult->status.closed) { // remove the window slot from hash table
|
if (pResult->status.closed) { // remove the window slot from hash table
|
||||||
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
taosHashRemove(pWindowResInfo->hashList, (const char *)&pResult->window.skey, pWindowResInfo->type);
|
||||||
|
printf("remove ============>%ld, remain size:%ld\n", pResult->window.skey, pWindowResInfo->hashList->size);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -133,14 +135,16 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->size = remain;
|
pWindowResInfo->size = remain;
|
||||||
|
printf("---------------size:%ld\n", taosHashGetSize(pWindowResInfo->hashList));
|
||||||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||||
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
int32_t *p = (int32_t *)taosHashGet(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||||
|
tDataTypeDesc[pWindowResInfo->type].nSize);
|
||||||
|
|
||||||
int32_t v = (*p - num);
|
int32_t v = (*p - num);
|
||||||
assert(v >= 0 && v <= pWindowResInfo->size);
|
assert(v >= 0 && v <= pWindowResInfo->size);
|
||||||
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t));
|
taosHashPut(pWindowResInfo->hashList, (char *)&pResult->window.skey, tDataTypeDesc[pWindowResInfo->type].nSize,
|
||||||
|
(char *)&v, sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
|
|
|
@ -355,7 +355,7 @@ if $data00 != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
if $data01 != 800 then
|
if $data11 != 800 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ step1:
|
||||||
sql create database $db cache 16
|
sql create database $db cache 16
|
||||||
print ====== create tables
|
print ====== create tables
|
||||||
sql use $db
|
sql use $db
|
||||||
|
sql reset query cache
|
||||||
$i = 0
|
$i = 0
|
||||||
$ts = $ts0
|
$ts = $ts0
|
||||||
$tb = $tbPrefix . $i
|
$tb = $tbPrefix . $i
|
||||||
|
|
Loading…
Reference in New Issue