[td-225] opt query perf
This commit is contained in:
parent
a8399213f2
commit
d496869ee7
|
@ -45,6 +45,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
|
|||
pSql->pTscObj = pObj;
|
||||
pSql->maxRetry = TSDB_MAX_REPLICA_NUM;
|
||||
pSql->fp = fp;
|
||||
pSql->fetchFp = fp;
|
||||
|
||||
pSql->sqlstr = calloc(1, sqlLen + 1);
|
||||
if (pSql->sqlstr == NULL) {
|
||||
|
@ -159,7 +160,7 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
|
|||
pRes->code = numOfRows;
|
||||
}
|
||||
|
||||
tscQueueAsyncError(pSql->fetchFp, param, pRes->code);
|
||||
tscQueueAsyncRes(pSql);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -346,31 +347,32 @@ void tscProcessFetchRow(SSchedMsg *pMsg) {
|
|||
|
||||
void tscProcessAsyncRes(SSchedMsg *pMsg) {
|
||||
SSqlObj *pSql = (SSqlObj *)pMsg->ahandle;
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
// SSqlCmd *pCmd = &pSql->cmd;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
void *taosres = pSql;
|
||||
// void *taosres = pSql;
|
||||
|
||||
// pCmd may be released, so cache pCmd->command
|
||||
int cmd = pCmd->command;
|
||||
int code = pRes->code;
|
||||
// int cmd = pCmd->command;
|
||||
// int code = pRes->code;
|
||||
|
||||
// in case of async insert, restore the user specified callback function
|
||||
bool shouldFree = tscShouldBeFreed(pSql);
|
||||
// bool shouldFree = tscShouldBeFreed(pSql);
|
||||
|
||||
if (cmd == TSDB_SQL_INSERT) {
|
||||
assert(pSql->fp != NULL);
|
||||
pSql->fp = pSql->fetchFp;
|
||||
}
|
||||
// if (pCmd->command == TSDB_SQL_INSERT) {
|
||||
// assert(pSql->fp != NULL);
|
||||
assert(pSql->fp != NULL && pSql->fetchFp != NULL);
|
||||
// }
|
||||
|
||||
if (pSql->fp) {
|
||||
(*pSql->fp)(pSql->param, taosres, code);
|
||||
}
|
||||
// if (pSql->fp) {
|
||||
pSql->fp = pSql->fetchFp;
|
||||
(*pSql->fp)(pSql->param, pSql, pRes->code);
|
||||
// }
|
||||
|
||||
if (shouldFree) {
|
||||
tscDebug("%p sqlObj is automatically freed in async res", pSql);
|
||||
tscFreeSqlObj(pSql);
|
||||
}
|
||||
// if (shouldFree) {
|
||||
// tscDebug("%p sqlObj is automatically freed in async res", pSql);
|
||||
// tscFreeSqlObj(pSql);
|
||||
// }
|
||||
}
|
||||
|
||||
static void tscProcessAsyncError(SSchedMsg *pMsg) {
|
||||
|
|
|
@ -708,6 +708,11 @@ static int32_t firstDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY en
|
|||
if (pCtx->order == TSDB_ORDER_DESC) {
|
||||
return BLK_DATA_NO_NEEDED;
|
||||
}
|
||||
|
||||
// not initialized yet, it is the first block, load it.
|
||||
if (pCtx->aOutputBuf == NULL) {
|
||||
return BLK_DATA_ALL_NEEDED;
|
||||
}
|
||||
|
||||
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
|
||||
if (pInfo->hasResult != DATA_SET_FLAG) {
|
||||
|
@ -721,7 +726,12 @@ static int32_t lastDistFuncRequired(SQLFunctionCtx *pCtx, TSKEY start, TSKEY end
|
|||
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||
return BLK_DATA_NO_NEEDED;
|
||||
}
|
||||
|
||||
|
||||
// not initialized yet, it is the first block, load it.
|
||||
if (pCtx->aOutputBuf == NULL) {
|
||||
return BLK_DATA_ALL_NEEDED;
|
||||
}
|
||||
|
||||
SFirstLastInfo *pInfo = (SFirstLastInfo*) (pCtx->aOutputBuf + pCtx->inputBytes);
|
||||
if (pInfo->hasResult != DATA_SET_FLAG) {
|
||||
return BLK_DATA_ALL_NEEDED;
|
||||
|
@ -1540,6 +1550,8 @@ static void first_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t in
|
|||
* to decide if the value is earlier than current intermediate result
|
||||
*/
|
||||
static void first_dist_function(SQLFunctionCtx *pCtx) {
|
||||
assert(pCtx->size > 0);
|
||||
|
||||
if (pCtx->size == 0) {
|
||||
return;
|
||||
}
|
||||
|
@ -1554,7 +1566,12 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
int32_t notNullElems = 0;
|
||||
|
||||
|
||||
// data block is discard, not loaded, do not need to check it
|
||||
if (!pCtx->preAggVals.dataBlockLoaded) {
|
||||
return;
|
||||
}
|
||||
|
||||
// find the first not null value
|
||||
for (int32_t i = 0; i < pCtx->size; ++i) {
|
||||
char *data = GET_INPUT_CHAR_INDEX(pCtx, i);
|
||||
|
@ -1575,10 +1592,6 @@ static void first_dist_function(SQLFunctionCtx *pCtx) {
|
|||
}
|
||||
|
||||
static void first_dist_function_f(SQLFunctionCtx *pCtx, int32_t index) {
|
||||
if (pCtx->size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
char *pData = GET_INPUT_CHAR_INDEX(pCtx, index);
|
||||
if (pCtx->hasNull && isNull(pData, pCtx->inputType)) {
|
||||
return;
|
||||
|
@ -1706,10 +1719,6 @@ static void last_data_assign_impl(SQLFunctionCtx *pCtx, char *pData, int32_t ind
|
|||
}
|
||||
|
||||
static void last_dist_function(SQLFunctionCtx *pCtx) {
|
||||
if (pCtx->size == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
/*
|
||||
* 1. for scan data in asc order, no need to check data
|
||||
* 2. for data blocks that are not loaded, no need to check data
|
||||
|
@ -1717,7 +1726,12 @@ static void last_dist_function(SQLFunctionCtx *pCtx) {
|
|||
if (pCtx->order != pCtx->param[0].i64Key) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
// data block is discard, not loaded, do not need to check it
|
||||
if (!pCtx->preAggVals.dataBlockLoaded) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t notNullElems = 0;
|
||||
|
||||
for (int32_t i = pCtx->size - 1; i >= 0; --i) {
|
||||
|
|
|
@ -1648,6 +1648,7 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, void (*fp)(), void* param, int32_t cm
|
|||
}
|
||||
|
||||
pNew->fp = fp;
|
||||
pNew->fetchFp = fp;
|
||||
pNew->param = param;
|
||||
pNew->maxRetry = TSDB_MAX_REPLICA_NUM;
|
||||
|
||||
|
@ -1803,6 +1804,8 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
|
|||
}
|
||||
|
||||
pNew->fp = fp;
|
||||
pNew->fetchFp = fp;
|
||||
|
||||
pNew->param = param;
|
||||
pNew->maxRetry = TSDB_MAX_REPLICA_NUM;
|
||||
|
||||
|
@ -2041,10 +2044,8 @@ void tscTryQueryNextVnode(SSqlObj* pSql, __async_cb_func_t fp) {
|
|||
|
||||
// set the callback function
|
||||
pSql->fp = fp;
|
||||
int32_t ret = tscProcessSql(pSql);
|
||||
if (ret == TSDB_CODE_SUCCESS) {
|
||||
return;
|
||||
} else {// todo check for failure
|
||||
if (tscProcessSql(pSql) != 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -125,7 +125,8 @@ typedef struct SArithmeticSupport {
|
|||
} SArithmeticSupport;
|
||||
|
||||
typedef struct SQLPreAggVal {
|
||||
bool isSet;
|
||||
bool isSet; // statistics info set or not
|
||||
bool dataBlockLoaded; // data block is loaded or not
|
||||
SDataStatis statis;
|
||||
} SQLPreAggVal;
|
||||
|
||||
|
|
|
@ -1358,6 +1358,8 @@ void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void* inputData, TSKEY
|
|||
pCtx->preAggVals.isSet = false;
|
||||
}
|
||||
|
||||
pCtx->preAggVals.dataBlockLoaded = (inputData != NULL);
|
||||
|
||||
// limit/offset query will affect this value
|
||||
pCtx->startOffset = QUERY_IS_ASC_QUERY(pQuery) ? pQuery->pos:0;
|
||||
pCtx->size = QUERY_IS_ASC_QUERY(pQuery) ? pBlockInfo->rows - pQuery->pos : pQuery->pos + 1;
|
||||
|
@ -1928,7 +1930,7 @@ char *getPosInResultPage(SQueryRuntimeEnv *pRuntimeEnv, int32_t columnIndex, SWi
|
|||
pQuery->pSelectExpr[columnIndex].bytes * realRowId;
|
||||
}
|
||||
|
||||
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_DOUBLE && (_t) != TSDB_DATA_TYPE_FLOAT)
|
||||
#define IS_PREFILTER_TYPE(_t) ((_t) != TSDB_DATA_TYPE_BINARY && (_t) != TSDB_DATA_TYPE_NCHAR)
|
||||
|
||||
static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, SQLFunctionCtx *pCtx,
|
||||
int32_t numOfRows) {
|
||||
|
@ -1948,13 +1950,14 @@ static bool needToLoadDataBlock(SQueryRuntimeEnv* pRuntimeEnv, SDataStatis *pDat
|
|||
}
|
||||
}
|
||||
|
||||
// no statistics data
|
||||
if (index == -1) {
|
||||
continue;
|
||||
return true;
|
||||
}
|
||||
|
||||
// not support pre-filter operation on binary/nchar data type
|
||||
if (!IS_PREFILTER_TYPE(pFilterInfo->info.type)) {
|
||||
continue;
|
||||
return true;
|
||||
}
|
||||
|
||||
// all points in current column are NULL, no need to check its boundary value
|
||||
|
@ -2203,7 +2206,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
summary->totalBlocks += 1;
|
||||
|
||||
if (IS_QUERY_KILLED(GET_QINFO_ADDR(pRuntimeEnv))) {
|
||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
|
@ -3357,12 +3359,10 @@ void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols)
|
|||
cleanupTimeWindowInfo(&pTableQueryInfo->windowResInfo, numOfCols);
|
||||
}
|
||||
|
||||
#define SET_CURRENT_QUERY_TABLE_INFO(_runtime, _tableInfo) \
|
||||
do { \
|
||||
SQuery *_query = (_runtime)->pQuery; \
|
||||
_query->current = _tableInfo; \
|
||||
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_query)) || \
|
||||
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_query))); \
|
||||
#define CHECK_QUERY_TIME_RANGE(_q, _tableInfo) \
|
||||
do { \
|
||||
assert((((_tableInfo)->lastKey >= (_tableInfo)->win.skey) && QUERY_IS_ASC_QUERY(_q)) || \
|
||||
(((_tableInfo)->lastKey <= (_tableInfo)->win.skey) && !QUERY_IS_ASC_QUERY(_q))); \
|
||||
} while (0)
|
||||
|
||||
/**
|
||||
|
@ -4212,6 +4212,23 @@ static void enableExecutionForNextTable(SQueryRuntimeEnv *pRuntimeEnv) {
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void setEnvForEachBlock(SQInfo* pQInfo, STableQueryInfo* pTableQueryInfo, SDataBlockInfo* pBlockInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery* pQuery = pQInfo->runtimeEnv.pQuery;
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
setExecutionContext(pQInfo, pTableQueryInfo->groupIndex, pBlockInfo->window.ekey + step);
|
||||
} else { // interval query
|
||||
TSKEY nextKey = pBlockInfo->window.skey;
|
||||
setIntervalQueryRange(pQInfo, nextKey);
|
||||
|
||||
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
|
||||
setAdditionalInfo(pQInfo, pTableQueryInfo->pTable, pTableQueryInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
|
@ -4226,6 +4243,7 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
|
||||
while (tsdbNextDataBlock(pQueryHandle)) {
|
||||
summary->totalBlocks += 1;
|
||||
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
@ -4236,24 +4254,16 @@ static int64_t scanMultiTableDataBlocks(SQInfo *pQInfo) {
|
|||
break;
|
||||
}
|
||||
|
||||
assert(*pTableQueryInfo != NULL);
|
||||
SET_CURRENT_QUERY_TABLE_INFO(pRuntimeEnv, *pTableQueryInfo);
|
||||
pQuery->current = *pTableQueryInfo;
|
||||
CHECK_QUERY_TIME_RANGE(pQuery, *pTableQueryInfo);
|
||||
|
||||
if (!pRuntimeEnv->groupbyNormalCol) {
|
||||
if (!QUERY_IS_INTERVAL_QUERY(pQuery)) {
|
||||
setExecutionContext(pQInfo, (*pTableQueryInfo)->groupIndex, blockInfo.window.ekey + step);
|
||||
} else { // interval query
|
||||
TSKEY nextKey = blockInfo.window.skey;
|
||||
setIntervalQueryRange(pQInfo, nextKey);
|
||||
|
||||
if (pRuntimeEnv->hasTagResults || pRuntimeEnv->pTSBuf != NULL) {
|
||||
setAdditionalInfo(pQInfo, (*pTableQueryInfo)->pTable, *pTableQueryInfo);
|
||||
}
|
||||
}
|
||||
setEnvForEachBlock(pQInfo, *pTableQueryInfo, &blockInfo);
|
||||
}
|
||||
|
||||
SDataStatis *pStatis = NULL;
|
||||
SArray *pDataBlock = NULL;
|
||||
|
||||
if (loadDataBlockOnDemand(pRuntimeEnv, pQueryHandle, &blockInfo, &pStatis, &pDataBlock) == BLK_DATA_DISCARD) {
|
||||
pQuery->current->lastKey = QUERY_IS_ASC_QUERY(pQuery)? blockInfo.window.ekey + step:blockInfo.window.skey + step;
|
||||
continue;
|
||||
|
@ -4516,7 +4526,6 @@ static void sequentialTableProcess(SQInfo *pQInfo) {
|
|||
|
||||
while (pQInfo->tableIndex < pQInfo->tableqinfoGroupInfo.numOfTables) {
|
||||
if (IS_QUERY_KILLED(pQInfo)) {
|
||||
finalizeQueryResult(pRuntimeEnv); // clean up allocated resource during query
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
|
@ -5014,6 +5023,7 @@ static void stableQueryImpl(SQInfo *pQInfo) {
|
|||
isFirstLastRowQuery(pQuery) || pRuntimeEnv->groupbyNormalCol);
|
||||
|
||||
sequentialTableProcess(pQInfo);
|
||||
|
||||
}
|
||||
|
||||
// record the total elapsed time
|
||||
|
@ -6112,11 +6122,6 @@ _over:
|
|||
//pQInfo already freed in initQInfo, but *pQInfo may not pointer to null;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
*pQInfo = NULL;
|
||||
} else {
|
||||
// SQInfo* pq = (SQInfo*) (*pQInfo);
|
||||
|
||||
// T_REF_INC(pq);
|
||||
// T_REF_INC(pq);
|
||||
}
|
||||
|
||||
// if failed to add ref for all meters in this query, abort current query
|
||||
|
|
|
@ -1801,7 +1801,8 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
}
|
||||
|
||||
tsdbLoadCompData(&pHandle->rhelper, pBlockInfo->compBlock, NULL);
|
||||
|
||||
|
||||
// todo opt perf
|
||||
size_t numOfCols = QH_GET_NUM_OF_COLS(pHandle);
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SDataStatis* st = &pHandle->statis[i];
|
||||
|
@ -1820,6 +1821,13 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
|||
if (pHandle->statis[i].numOfNull == -1) { // set the column data are all NULL
|
||||
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||
}
|
||||
|
||||
// todo opt perf
|
||||
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
|
||||
pHandle->statis[i].max = pBlockInfo->compBlock->keyLast;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
Loading…
Reference in New Issue