Merge branch 'feature/query' into feature/qrefactor
This commit is contained in:
commit
ed933ba7e9
|
@ -34,7 +34,7 @@
|
|||
# 1.0: all CPU cores are available for query processing [default].
|
||||
# 0.5: only half of the CPU cores are available for query.
|
||||
# 0.0: only one core available.
|
||||
# tsRatioOfQueryCores 1.0
|
||||
# ratioOfQueryCores 1.0
|
||||
|
||||
# the last_row/first/last aggregator will not change the original column name in the result fields
|
||||
# keepColumnName 0
|
||||
|
|
|
@ -2970,7 +2970,6 @@ int32_t parseGroupbyClause(SQueryInfo* pQueryInfo, SArray* pList, SSqlCmd* pCmd)
|
|||
|
||||
STableMeta* pTableMeta = NULL;
|
||||
SSchema* pSchema = NULL;
|
||||
// SSchema s = tGetTbnameColumnSchema();
|
||||
|
||||
int32_t tableIndex = COLUMN_INDEX_INITIAL_VAL;
|
||||
|
||||
|
|
|
@ -1878,14 +1878,31 @@ void doAppendData(SInterResult* pInterResult, TAOS_ROW row, int32_t numOfCols, S
|
|||
}
|
||||
}
|
||||
|
||||
static void destroySup(SFirstRoundQuerySup* pSup) {
|
||||
taosArrayDestroyEx(pSup->pResult, freeInterResult);
|
||||
taosArrayDestroy(pSup->pColsInfo);
|
||||
tfree(pSup);
|
||||
}
|
||||
|
||||
void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
||||
SSqlObj* pSql = (SSqlObj*)tres;
|
||||
SSqlRes* pRes = &pSql->res;
|
||||
|
||||
SFirstRoundQuerySup* pSup = param;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
if (numOfRows > 0) {
|
||||
SSqlObj* pParent = pSup->pParent;
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
|
||||
int32_t code = taos_errno(pSql);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
destroySup(pSup);
|
||||
taos_free_result(pSql);
|
||||
pParent->res.code = code;
|
||||
tscAsyncResultOnError(pParent);
|
||||
return;
|
||||
}
|
||||
|
||||
if (numOfRows > 0) { // the number is not correct for group by column in super table query
|
||||
TAOS_ROW row = NULL;
|
||||
int32_t numOfCols = taos_field_count(tres);
|
||||
|
||||
|
@ -1895,6 +1912,7 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
|
||||
while ((row = taos_fetch_row(tres)) != NULL) {
|
||||
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
||||
pSup->numOfRows += 1;
|
||||
}
|
||||
} else { // tagLen > 0
|
||||
char* p = calloc(1, pSup->tagLen);
|
||||
|
@ -1906,7 +1924,9 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfCols && offset < pSup->tagLen; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag)) {
|
||||
|
||||
// tag or group by column
|
||||
if (TSDB_COL_IS_TAG(pExpr->colInfo.flag) || pExpr->functionId == TSDB_FUNC_PRJ) {
|
||||
memcpy(p + offset, row[i], length[i]);
|
||||
offset += pExpr->resBytes;
|
||||
}
|
||||
|
@ -1935,20 +1955,20 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
taosArrayPush(pSup->pResult, &interResult);
|
||||
doAppendData(&interResult, row, numOfCols, pQueryInfo);
|
||||
}
|
||||
|
||||
pSup->numOfRows += 1;
|
||||
}
|
||||
|
||||
tfree(p);
|
||||
}
|
||||
}
|
||||
|
||||
pSup->numOfRows += numOfRows;
|
||||
if (!pRes->completed) {
|
||||
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
||||
return;
|
||||
}
|
||||
|
||||
// set the parameters for the second round query process
|
||||
SSqlObj *pParent = pSup->pParent;
|
||||
SSqlCmd *pPCmd = &pParent->cmd;
|
||||
SQueryInfo *pQueryInfo1 = tscGetQueryInfoDetail(pPCmd, 0);
|
||||
|
||||
|
@ -1974,9 +1994,19 @@ void tscFirstRoundRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
|
|||
}
|
||||
|
||||
void tscFirstRoundCallback(void* param, TAOS_RES* tres, int code) {
|
||||
int32_t c = taos_errno(tres);
|
||||
SFirstRoundQuerySup* pSup = (SFirstRoundQuerySup*) param;
|
||||
|
||||
SSqlObj* pSql = (SSqlObj*) tres;
|
||||
int32_t c = taos_errno(pSql);
|
||||
|
||||
if (c != TSDB_CODE_SUCCESS) {
|
||||
// TODO HANDLE ERROR
|
||||
SSqlObj* parent = pSup->pParent;
|
||||
|
||||
destroySup(pSup);
|
||||
taos_free_result(pSql);
|
||||
parent->res.code = code;
|
||||
tscAsyncResultOnError(parent);
|
||||
return;
|
||||
}
|
||||
|
||||
taos_fetch_rows_a(tres, tscFirstRoundRetrieveCallback, param);
|
||||
|
@ -2010,13 +2040,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
pNewQueryInfo->groupbyExpr.columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
|
||||
if (pNewQueryInfo->groupbyExpr.columnInfo == NULL) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// goto _error;
|
||||
goto _error;
|
||||
}
|
||||
}
|
||||
|
||||
if (tscTagCondCopy(&pNewQueryInfo->tagCond, &pQueryInfo->tagCond) != 0) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// goto _error;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
pNewQueryInfo->interval = pQueryInfo->interval;
|
||||
|
@ -2027,7 +2057,6 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
int32_t numOfExprs = (int32_t) tscSqlExprNumOfExprs(pQueryInfo);
|
||||
|
||||
int32_t index = 0;
|
||||
int32_t numOfTags = 0;
|
||||
for(int32_t i = 0; i < numOfExprs; ++i) {
|
||||
SSqlExpr* pExpr = tscSqlExprGet(pQueryInfo, i);
|
||||
if (pExpr->functionId == TSDB_FUNC_TS && pQueryInfo->interval.interval > 0) {
|
||||
|
@ -2060,7 +2089,25 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
|
||||
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_TAG, &colIndex, schema, TSDB_COL_TAG);
|
||||
p->resColId = pExpr->resColId;
|
||||
numOfTags += 1;
|
||||
} else if (pExpr->functionId == TSDB_FUNC_PRJ) {
|
||||
int32_t num = taosArrayGetSize(pNewQueryInfo->groupbyExpr.columnInfo);
|
||||
for(int32_t k = 0; k < num; ++k) {
|
||||
SColIndex* pIndex = taosArrayGet(pNewQueryInfo->groupbyExpr.columnInfo, k);
|
||||
if (pExpr->colInfo.colId == pIndex->colId) {
|
||||
pSup->tagLen += pExpr->resBytes;
|
||||
taosArrayPush(pSup->pColsInfo, &pExpr->resColId);
|
||||
|
||||
SColumnIndex colIndex = {.tableIndex = 0, .columnIndex = pIndex->colIndex};
|
||||
SSchema* schema = tscGetColumnSchemaById(pTableMetaInfo1->pTableMeta, pExpr->colInfo.colId);
|
||||
|
||||
//doLimitOutputNormalColOfGroupby
|
||||
SSqlExpr* p = tscAddFuncInSelectClause(pNewQueryInfo, index++, TSDB_FUNC_PRJ, &colIndex, schema, TSDB_COL_NORMAL);
|
||||
p->numOfParams = 1;
|
||||
p->param[0].i64 = 1;
|
||||
p->param[0].nType = TSDB_DATA_TYPE_INT;
|
||||
p->resColId = pExpr->resColId; // update the result column id
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2077,6 +2124,13 @@ int32_t tscHandleFirstRoundStableQuery(SSqlObj *pSql) {
|
|||
|
||||
tscHandleMasterSTableQuery(pNew);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_error:
|
||||
destroySup(pSup);
|
||||
taos_free_result(pNew);
|
||||
pSql->res.code = terrno;
|
||||
tscAsyncResultOnError(pSql);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t tscHandleMasterSTableQuery(SSqlObj *pSql) {
|
||||
|
|
|
@ -236,6 +236,7 @@ typedef struct SQueryRuntimeEnv {
|
|||
bool timeWindowInterpo;// if the time window start/end required interpolation
|
||||
bool queryWindowIdentical; // all query time windows are identical for all tables in one group
|
||||
bool queryBlockDist; // if query data block distribution
|
||||
bool stabledev; // super table stddev query
|
||||
int32_t interBufSize; // intermediate buffer sizse
|
||||
int32_t prevGroupId; // previous executed group id
|
||||
SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file
|
||||
|
|
|
@ -1630,6 +1630,97 @@ static void stddev_dst_function(SQLFunctionCtx *pCtx) {
|
|||
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
|
||||
}
|
||||
|
||||
static void stddev_dst_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
void *pData = GET_INPUT_DATA(pCtx, index);
|
||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// the second stage to calculate standard deviation
|
||||
SStddevdstInfo *pStd = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
|
||||
double *retVal = &pStd->res;
|
||||
|
||||
// all data are null, no need to proceed
|
||||
SArray* resList = (SArray*) pCtx->param[0].pz;
|
||||
if (resList == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
// find the correct group average results according to the tag value
|
||||
int32_t len = (int32_t) taosArrayGetSize(resList);
|
||||
assert(len > 0);
|
||||
|
||||
double avg = 0;
|
||||
if (len == 1) {
|
||||
SResPair* p = taosArrayGet(resList, 0);
|
||||
avg = p->avg;
|
||||
} else { // todo opt performance by using iterator since the timestamp lsit is matched with the output result
|
||||
SResPair* p = bsearch(&pCtx->startTs, resList->pData, len, sizeof(SResPair), tsCompare);
|
||||
assert(p != NULL);
|
||||
|
||||
avg = p->avg;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
switch (pCtx->inputType) {
|
||||
case TSDB_DATA_TYPE_INT: {
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
if (pCtx->hasNull && isNull((const char*) (&((int32_t *)pData)[i]), pCtx->inputType)) {
|
||||
continue;
|
||||
}
|
||||
num += 1;
|
||||
*retVal += POW2(((int32_t *)pData)[i] - avg);
|
||||
}
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_FLOAT: {
|
||||
LOOP_STDDEV_IMPL(float, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_DOUBLE: {
|
||||
LOOP_STDDEV_IMPL(double, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_TINYINT: {
|
||||
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UTINYINT: {
|
||||
LOOP_STDDEV_IMPL(int8_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_SMALLINT: {
|
||||
LOOP_STDDEV_IMPL(int16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_USMALLINT: {
|
||||
LOOP_STDDEV_IMPL(uint16_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UINT: {
|
||||
LOOP_STDDEV_IMPL(uint32_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_BIGINT: {
|
||||
LOOP_STDDEV_IMPL(int64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_UBIGINT: {
|
||||
LOOP_STDDEV_IMPL(uint64_t, *retVal, pData, pCtx, avg, pCtx->inputType, num);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
qError("stddev function not support data type:%d", pCtx->inputType);
|
||||
}
|
||||
|
||||
pStd->num += num;
|
||||
SET_VAL(pCtx, num, 1);
|
||||
|
||||
// copy to the final output buffer for super table
|
||||
memcpy(pCtx->pOutput, GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)), sizeof(SAvgInfo));
|
||||
}
|
||||
|
||||
|
||||
static void stddev_dst_merge(SQLFunctionCtx *pCtx) {
|
||||
SResultRowCellInfo *pResInfo = GET_RES_INFO(pCtx);
|
||||
SStddevdstInfo* pRes = GET_ROWCELL_INTERBUF(pResInfo);
|
||||
|
@ -4835,7 +4926,7 @@ SAggFunctionInfo aAggs[] = {{
|
|||
TSDB_FUNCSTATE_SO | TSDB_FUNCSTATE_STABLE,
|
||||
function_setup,
|
||||
stddev_dst_function,
|
||||
noop2,
|
||||
stddev_dst_function_f,
|
||||
no_next_step,
|
||||
stddev_dst_finalizer,
|
||||
stddev_dst_merge,
|
||||
|
|
|
@ -280,6 +280,17 @@ bool isGroupbyColumn(SSqlGroupbyExpr *pGroupbyExpr) {
|
|||
return false;
|
||||
}
|
||||
|
||||
bool isStabledev(SQuery* pQuery) {
|
||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
int32_t functId = pQuery->pExpr1[i].base.functionId;
|
||||
if (functId == TSDB_FUNC_STDDEV_DST) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
int16_t getGroupbyColumnType(SQuery *pQuery, SSqlGroupbyExpr *pGroupbyExpr) {
|
||||
assert(pGroupbyExpr != NULL);
|
||||
|
||||
|
@ -1637,8 +1648,9 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
pWindowResInfo->curIndex = index;
|
||||
} else { // other queries
|
||||
// decide which group this rows belongs to according to current state value
|
||||
char* val = NULL;
|
||||
if (groupbyColumnValue) {
|
||||
char *val = groupbyColumnData + bytes * offset;
|
||||
val = groupbyColumnData + bytes * offset;
|
||||
if (isNull(val, type)) { // ignore the null value
|
||||
continue;
|
||||
}
|
||||
|
@ -1649,6 +1661,34 @@ static void rowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SDataStatis *pS
|
|||
}
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->stabledev) {
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
int32_t functionId = pQuery->pExpr1[k].base.functionId;
|
||||
if (functionId != TSDB_FUNC_STDDEV_DST) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pRuntimeEnv->pCtx[k].param[0].arr = NULL;
|
||||
pRuntimeEnv->pCtx[k].param[0].nType = TSDB_DATA_TYPE_INT; // avoid freeing the memory by setting the type to be int
|
||||
|
||||
// todo opt perf
|
||||
int32_t numOfGroup = (int32_t)taosArrayGetSize(pRuntimeEnv->prevResult);
|
||||
for (int32_t i = 0; i < numOfGroup; ++i) {
|
||||
SInterResult *p = taosArrayGet(pRuntimeEnv->prevResult, i);
|
||||
if (memcmp(p->tags, val, bytes) == 0) {
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(p->pResult);
|
||||
for (int32_t f = 0; f < numOfCols; ++f) {
|
||||
SStddevInterResult *pres = taosArrayGet(p->pResult, f);
|
||||
if (pres->colId == pQuery->pExpr1[k].base.colInfo.colId) {
|
||||
pRuntimeEnv->pCtx[k].param[0].arr = pres->pResult;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < pQuery->numOfOutput; ++k) {
|
||||
int32_t functionId = pQuery->pExpr1[k].base.functionId;
|
||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||
|
@ -3799,7 +3839,7 @@ int32_t setTimestampListJoinInfo(SQInfo *pQInfo, STableQueryInfo *pTableQueryInf
|
|||
int32_t setParamValue(SQueryRuntimeEnv* pRuntimeEnv) {
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
if (pRuntimeEnv->prevResult == NULL) {
|
||||
if (pRuntimeEnv->prevResult == NULL || pRuntimeEnv->groupbyColumn) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -4602,6 +4642,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
|||
pRuntimeEnv->stableQuery = isSTableQuery;
|
||||
pRuntimeEnv->prevGroupId = INT32_MIN;
|
||||
pRuntimeEnv->groupbyColumn = isGroupbyColumn(pQuery->pGroupbyExpr);
|
||||
pRuntimeEnv->stabledev = isStabledev(pQuery);
|
||||
|
||||
if (pTsBuf != NULL) {
|
||||
int16_t order = (pQuery->order.order == pRuntimeEnv->pTsBuf->tsOrder) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
|
@ -4701,13 +4742,6 @@ static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTa
|
|||
setTimestampListJoinInfo(pQInfo, pTableQueryInfo);
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
|
||||
setParamValue(pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
setIntervalQueryRange(pQInfo, pBlockInfo->window.skey);
|
||||
} else { // non-interval query
|
||||
|
@ -4761,6 +4795,15 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
||||
}
|
||||
|
||||
if (pRuntimeEnv->stabledev) {
|
||||
for(int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||
if (pQuery->pExpr1[i].base.functionId == TSDB_FUNC_STDDEV_DST) {
|
||||
setParamValue(pRuntimeEnv);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t status = 0;
|
||||
SDataStatis *pStatis = NULL;
|
||||
SArray *pDataBlock = NULL;
|
||||
|
|
Loading…
Reference in New Issue