[td-225] refactor codes and add some log.
This commit is contained in:
parent
3d16404e19
commit
7ccba2bdf2
|
@ -45,7 +45,6 @@ typedef void (*__do_filter_suppl_fn_t)(void *, void *);
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
typedef struct tQueryInfo {
|
typedef struct tQueryInfo {
|
||||||
int32_t colIndex; // index of column in schema
|
|
||||||
uint8_t optr; // expression operator
|
uint8_t optr; // expression operator
|
||||||
SSchema sch; // schema of tags
|
SSchema sch; // schema of tags
|
||||||
char* q;
|
char* q;
|
||||||
|
|
|
@ -50,11 +50,6 @@
|
||||||
|
|
||||||
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
#define SDATA_BLOCK_INITIALIZER (SDataBlockInfo) {{0}, 0}
|
||||||
|
|
||||||
/* get the qinfo struct address from the query struct address */
|
|
||||||
#define GET_COLUMN_BYTES(query, colidx) \
|
|
||||||
((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].bytes)
|
|
||||||
#define GET_COLUMN_TYPE(query, colidx) ((query)->colList[(query)->pSelectExpr[colidx].base.colInfo.colIndex].type)
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
// when query starts to execute, this status will set
|
// when query starts to execute, this status will set
|
||||||
QUERY_NOT_COMPLETED = 0x1u,
|
QUERY_NOT_COMPLETED = 0x1u,
|
||||||
|
@ -394,15 +389,15 @@ static SWindowResult *doSetTimeWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SWin
|
||||||
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
|
if (pWindowResInfo->size >= pWindowResInfo->capacity) {
|
||||||
int64_t newCap = pWindowResInfo->capacity * 1.5;
|
int64_t newCap = pWindowResInfo->capacity * 1.5;
|
||||||
char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult));
|
char *t = realloc(pWindowResInfo->pResult, newCap * sizeof(SWindowResult));
|
||||||
if (t != NULL) {
|
if (t == NULL) {
|
||||||
pWindowResInfo->pResult = (SWindowResult *)t;
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
|
||||||
int32_t inc = newCap - pWindowResInfo->capacity;
|
|
||||||
memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc);
|
|
||||||
} else {
|
|
||||||
// todo
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pWindowResInfo->pResult = (SWindowResult *)t;
|
||||||
|
|
||||||
|
int32_t inc = newCap - pWindowResInfo->capacity;
|
||||||
|
memset(&pWindowResInfo->pResult[pWindowResInfo->capacity], 0, sizeof(SWindowResult) * inc);
|
||||||
|
|
||||||
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
|
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
|
||||||
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
|
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, pRuntimeEnv->interBufSize);
|
||||||
}
|
}
|
||||||
|
@ -1053,9 +1048,9 @@ static char *getGroupbyColumnData(SQuery *pQuery, int16_t *type, int16_t *bytes,
|
||||||
*type = pQuery->colList[colIndex].type;
|
*type = pQuery->colList[colIndex].type;
|
||||||
*bytes = pQuery->colList[colIndex].bytes;
|
*bytes = pQuery->colList[colIndex].bytes;
|
||||||
/*
|
/*
|
||||||
* the colIndex is acquired from the first meter of all qualified meters in this vnode during query prepare
|
* the colIndex is acquired from the first tables of all qualified tables in this vnode during query prepare
|
||||||
* stage, the remain meter may not have the required column in cache actually. So, the validation of required
|
* stage, the remain tables may not have the required column in cache actually. So, the validation of required
|
||||||
* column in cache with the corresponding meter schema is reinforced.
|
* column in cache with the corresponding schema is reinforced.
|
||||||
*/
|
*/
|
||||||
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
int32_t numOfCols = taosArrayGetSize(pDataBlock);
|
||||||
|
|
||||||
|
@ -2364,6 +2359,18 @@ static void doSetTagValueInParam(void *tsdb, void* pTable, int32_t tagColId, tVa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t numOfTags, int16_t colId) {
|
||||||
|
assert(pTagColList != NULL && numOfTags > 0);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < numOfTags; ++i) {
|
||||||
|
if (pTagColList[i].colId == colId) {
|
||||||
|
return &pTagColList[i];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
|
void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
SQInfo* pQInfo = GET_QINFO_ADDR(pRuntimeEnv);
|
||||||
|
@ -2372,16 +2379,10 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
|
||||||
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
|
if (pQuery->numOfOutput == 1 && pExprInfo->base.functionId == TSDB_FUNC_TS_COMP) {
|
||||||
assert(pExprInfo->base.numOfParams == 1);
|
assert(pExprInfo->base.numOfParams == 1);
|
||||||
|
|
||||||
// todo refactor extract function.
|
int16_t tagColId = pExprInfo->base.arg->argValue.i64;
|
||||||
int16_t type = -1, bytes = -1;
|
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
|
||||||
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
|
|
||||||
if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
|
|
||||||
type = pQuery->tagColList[i].type;
|
|
||||||
bytes = pQuery->tagColList[i].bytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
|
doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
|
||||||
} else {
|
} else {
|
||||||
// set tag value, by which the results are aggregated.
|
// set tag value, by which the results are aggregated.
|
||||||
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
for (int32_t idx = 0; idx < pQuery->numOfOutput; ++idx) {
|
||||||
|
@ -2399,20 +2400,14 @@ void setTagVal(SQueryRuntimeEnv *pRuntimeEnv, void *pTable, void *tsdb) {
|
||||||
|
|
||||||
// set the join tag for first column
|
// set the join tag for first column
|
||||||
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
|
SSqlFuncMsg *pFuncMsg = &pExprInfo->base;
|
||||||
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX &&
|
if ((pFuncMsg->functionId == TSDB_FUNC_TS || pFuncMsg->functionId == TSDB_FUNC_PRJ) && pRuntimeEnv->pTSBuf != NULL &&
|
||||||
pRuntimeEnv->pTSBuf != NULL) {
|
pFuncMsg->colInfo.colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX) {
|
||||||
assert(pFuncMsg->numOfParams == 1);
|
assert(pFuncMsg->numOfParams == 1);
|
||||||
|
|
||||||
// todo refactor
|
int16_t tagColId = pExprInfo->base.arg->argValue.i64;
|
||||||
int16_t type = -1, bytes = -1;
|
SColumnInfo* pColInfo = doGetTagColumnInfoById(pQuery->tagColList, pQuery->numOfTags, tagColId);
|
||||||
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
|
|
||||||
if (pQuery->tagColList[i].colId == pExprInfo->base.arg->argValue.i64) {
|
|
||||||
type = pQuery->tagColList[i].type;
|
|
||||||
bytes = pQuery->tagColList[i].bytes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetTagValueInParam(tsdb, pTable, pExprInfo->base.arg->argValue.i64, &pRuntimeEnv->pCtx[0].tag, type, bytes);
|
doSetTagValueInParam(tsdb, pTable, tagColId, &pRuntimeEnv->pCtx[0].tag, pColInfo->type, pColInfo->bytes);
|
||||||
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64,
|
qDebug("QInfo:%p set tag value for join comparison, colId:%" PRId64 ", val:%"PRId64, pQInfo, pExprInfo->base.arg->argValue.i64,
|
||||||
pRuntimeEnv->pCtx[0].tag.i64Key)
|
pRuntimeEnv->pCtx[0].tag.i64Key)
|
||||||
}
|
}
|
||||||
|
@ -4149,6 +4144,7 @@ static int32_t setupQueryHandle(void* tsdb, SQInfo* pQInfo, bool isSTableQuery)
|
||||||
} else {
|
} else {
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->tableGroupInfo, pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4174,10 +4170,10 @@ static SFillColInfo* taosCreateFillColInfo(SQuery* pQuery) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
|
int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bool isSTableQuery) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
qDebug("QInfo:%p start to init qhandle", pQInfo);
|
||||||
|
|
||||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
|
|
||||||
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
pQuery->precision = tsdbGetCfg(tsdb)->precision;
|
||||||
|
@ -4186,6 +4182,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, void *tsdb, int32_t vgId, bo
|
||||||
|
|
||||||
setScanLimitationByResultBuffer(pQuery);
|
setScanLimitationByResultBuffer(pQuery);
|
||||||
changeExecuteScanOrder(pQInfo, false);
|
changeExecuteScanOrder(pQInfo, false);
|
||||||
|
|
||||||
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
code = setupQueryHandle(tsdb, pQInfo, isSTableQuery);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -5693,7 +5690,6 @@ static void doUpdateExprColumnIndex(SQuery *pQuery) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int compareTableIdInfo(const void* a, const void* b) {
|
static int compareTableIdInfo(const void* a, const void* b) {
|
||||||
const STableIdInfo* x = (const STableIdInfo*)a;
|
const STableIdInfo* x = (const STableIdInfo*)a;
|
||||||
const STableIdInfo* y = (const STableIdInfo*)b;
|
const STableIdInfo* y = (const STableIdInfo*)b;
|
||||||
|
@ -5926,6 +5922,7 @@ static int32_t initQInfo(SQueryTableMsg *pQueryMsg, void *tsdb, int32_t vgId, SQ
|
||||||
pQuery->window.ekey, pQuery->order.order);
|
pQuery->window.ekey, pQuery->order.order);
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
|
pQInfo->tableqinfoGroupInfo.numOfTables = 0;
|
||||||
|
|
||||||
sem_post(&pQInfo->dataReady);
|
sem_post(&pQInfo->dataReady);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -6136,16 +6133,17 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
char * tagCond = NULL, *tbnameCond = NULL;
|
char *tagCond = NULL;
|
||||||
SArray * pTableIdList = NULL;
|
char *tbnameCond = NULL;
|
||||||
SSqlFuncMsg **pExprMsg = NULL;
|
SArray *pTableIdList = NULL;
|
||||||
SColIndex * pGroupColIndex = NULL;
|
SSqlFuncMsg **pExprMsg = NULL;
|
||||||
SColumnInfo* pTagColumnInfo = NULL;
|
SExprInfo *pExprs = NULL;
|
||||||
SExprInfo *pExprs = NULL;
|
SColIndex *pGroupColIndex = NULL;
|
||||||
SSqlGroupbyExpr *pGroupbyExpr = NULL;
|
SColumnInfo *pTagColumnInfo = NULL;
|
||||||
|
SSqlGroupbyExpr *pGroupbyExpr = NULL;
|
||||||
|
|
||||||
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo)) !=
|
code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &tbnameCond, &pGroupColIndex, &pTagColumnInfo);
|
||||||
TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6172,6 +6170,7 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
|
||||||
|
|
||||||
bool isSTableQuery = false;
|
bool isSTableQuery = false;
|
||||||
STableGroupInfo tableGroupInfo = {0};
|
STableGroupInfo tableGroupInfo = {0};
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
|
if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_TABLE_QUERY)) {
|
||||||
STableIdInfo *id = taosArrayGet(pTableIdList, 0);
|
STableIdInfo *id = taosArrayGet(pTableIdList, 0);
|
||||||
|
@ -6182,7 +6181,6 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
|
||||||
}
|
}
|
||||||
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
} else if (TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY|TSDB_QUERY_TYPE_STABLE_QUERY)) {
|
||||||
isSTableQuery = true;
|
isSTableQuery = true;
|
||||||
// TODO: need a macro from TSDB to check if table is super table
|
|
||||||
|
|
||||||
// also note there's possibility that only one table in the super table
|
// also note there's possibility that only one table in the super table
|
||||||
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
|
if (!TSDB_QUERY_HAS_TYPE(pQueryMsg->queryType, TSDB_QUERY_TYPE_MULTITABLE_QUERY)) {
|
||||||
|
@ -6194,10 +6192,11 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
|
||||||
numOfGroupByCols = 0;
|
numOfGroupByCols = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("qmsg:%p query stable, uid:%"PRId64", tid:%d", pQueryMsg, id->uid, id->tid);
|
||||||
code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex,
|
code = tsdbQuerySTableByTagCond(tsdb, id->uid, tagCond, pQueryMsg->tagCondLen, pQueryMsg->tagNameRelType, tbnameCond, &tableGroupInfo, pGroupColIndex,
|
||||||
numOfGroupByCols);
|
numOfGroupByCols);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("qmsg:%p failed to QueryStable, reason: %s", pQueryMsg, tstrerror(code));
|
qError("qmsg:%p failed to query stable, reason: %s", pQueryMsg, tstrerror(code));
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -6208,6 +6207,9 @@ int32_t qCreateQueryInfo(void* tsdb, int32_t vgId, SQueryTableMsg* pQueryMsg, vo
|
||||||
|
|
||||||
qDebug("qmsg:%p query on %zu tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
|
qDebug("qmsg:%p query on %zu tables in one group from client", pQueryMsg, tableGroupInfo.numOfTables);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampUs() - st;
|
||||||
|
qDebug("qmsg:%p tag filter completed, elapsed time:%"PRId64"us", pQueryMsg, el);
|
||||||
} else {
|
} else {
|
||||||
assert(0);
|
assert(0);
|
||||||
}
|
}
|
||||||
|
@ -6247,7 +6249,7 @@ _over:
|
||||||
*pQInfo = NULL;
|
*pQInfo = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
// if failed to add ref for all meters in this query, abort current query
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -678,7 +678,7 @@ static void tQueryIndexlessColumn(SSkipList* pSkipList, tQueryInfo* pQueryInfo,
|
||||||
|
|
||||||
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
|
tstr *name = (tstr*) tsdbGetTableName(*(void**) pData);
|
||||||
// todo speed up by using hash
|
// todo speed up by using hash
|
||||||
if (pQueryInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pQueryInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
if (pQueryInfo->optr == TSDB_RELATION_IN) {
|
if (pQueryInfo->optr == TSDB_RELATION_IN) {
|
||||||
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
|
addToResult = pQueryInfo->compare(name, pQueryInfo->q);
|
||||||
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
|
} else if (pQueryInfo->optr == TSDB_RELATION_LIKE) {
|
||||||
|
@ -716,7 +716,7 @@ void tExprTreeTraverse(tExprNode *pExpr, SSkipList *pSkipList, SArray *result, S
|
||||||
}
|
}
|
||||||
|
|
||||||
tQueryInfo *pQueryInfo = pExpr->_node.info;
|
tQueryInfo *pQueryInfo = pExpr->_node.info;
|
||||||
if (pQueryInfo->colIndex == 0 && pQueryInfo->optr != TSDB_RELATION_LIKE) {
|
if (pQueryInfo->sch.colId == PRIMARYKEY_TIMESTAMP_COL_INDEX && pQueryInfo->optr != TSDB_RELATION_LIKE) {
|
||||||
tQueryIndexColumn(pSkipList, pQueryInfo, result);
|
tQueryIndexColumn(pSkipList, pQueryInfo, result);
|
||||||
} else {
|
} else {
|
||||||
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
|
tQueryIndexlessColumn(pSkipList, pQueryInfo, result, param->nodeFilterFn);
|
||||||
|
|
|
@ -240,7 +240,7 @@ TsdbQueryHandleT* tsdbQueryTables(TSDB_REPO_T* tsdb, STsdbQueryCond* pCond, STab
|
||||||
|
|
||||||
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
pQueryHandle->defaultLoadColumn = getDefaultLoadColumns(pQueryHandle, true);
|
||||||
|
|
||||||
tsdbDebug("%p total numOfTable:%zu in query", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo));
|
tsdbDebug("%p total numOfTable:%zu in query, %p", pQueryHandle, taosArrayGetSize(pQueryHandle->pTableCheckInfo), pQueryHandle->qinfo);
|
||||||
|
|
||||||
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
tsdbInitDataBlockLoadInfo(&pQueryHandle->dataBlockLoadInfo);
|
||||||
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
tsdbInitCompBlockLoadInfo(&pQueryHandle->compBlockLoadInfo);
|
||||||
|
@ -1869,7 +1869,6 @@ int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT* pQueryHandle, SDataSta
|
||||||
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
pHandle->statis[i].numOfNull = pBlockInfo->compBlock->numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo opt perf
|
|
||||||
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
SColumnInfo* pColInfo = taosArrayGet(pHandle->pColumns, i);
|
||||||
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pColInfo->type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
|
pHandle->statis[i].min = pBlockInfo->compBlock->keyFirst;
|
||||||
|
@ -1961,43 +1960,20 @@ static void destroyHelper(void* param) {
|
||||||
free(param);
|
free(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define TAG_INVALID_COLUMN_INDEX -2
|
|
||||||
static int32_t getTagColumnIndex(STSchema* pTSchema, SSchema* pSchema) {
|
|
||||||
// filter on table name(TBNAME)
|
|
||||||
if (strcasecmp(pSchema->name, TSQL_TBNAME_L) == 0) {
|
|
||||||
return TSDB_TBNAME_COLUMN_INDEX;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int32_t i = 0; i < schemaNCols(pTSchema); ++i) {
|
|
||||||
STColumn* pColumn = &pTSchema->columns[i];
|
|
||||||
if (pColumn->bytes == pSchema->bytes && pColumn->type == pSchema->type && pColumn->colId == pSchema->colId) {
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return -2;
|
|
||||||
}
|
|
||||||
|
|
||||||
void filterPrepare(void* expr, void* param) {
|
void filterPrepare(void* expr, void* param) {
|
||||||
tExprNode* pExpr = (tExprNode*)expr;
|
tExprNode* pExpr = (tExprNode*)expr;
|
||||||
if (pExpr->_node.info != NULL) {
|
if (pExpr->_node.info != NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0;
|
|
||||||
pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
|
pExpr->_node.info = calloc(1, sizeof(tQueryInfo));
|
||||||
|
|
||||||
STSchema* pTSSchema = (STSchema*) param;
|
STSchema* pTSSchema = (STSchema*) param;
|
||||||
|
|
||||||
tQueryInfo* pInfo = pExpr->_node.info;
|
tQueryInfo* pInfo = pExpr->_node.info;
|
||||||
tVariant* pCond = pExpr->_node.pRight->pVal;
|
tVariant* pCond = pExpr->_node.pRight->pVal;
|
||||||
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
|
SSchema* pSchema = pExpr->_node.pLeft->pSchema;
|
||||||
|
|
||||||
int32_t index = getTagColumnIndex(pTSSchema, pSchema);
|
|
||||||
assert((index >= 0 && i < TSDB_MAX_TAGS) || (index == TSDB_TBNAME_COLUMN_INDEX) || index == TAG_INVALID_COLUMN_INDEX);
|
|
||||||
|
|
||||||
pInfo->sch = *pSchema;
|
pInfo->sch = *pSchema;
|
||||||
pInfo->colIndex = index;
|
|
||||||
pInfo->optr = pExpr->_node.optr;
|
pInfo->optr = pExpr->_node.optr;
|
||||||
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
|
pInfo->compare = getComparFunc(pSchema->type, pInfo->optr);
|
||||||
pInfo->param = pTSSchema;
|
pInfo->param = pTSSchema;
|
||||||
|
@ -2143,7 +2119,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) {
|
||||||
|
|
||||||
char* val = NULL;
|
char* val = NULL;
|
||||||
|
|
||||||
if (pInfo->colIndex == TSDB_TBNAME_COLUMN_INDEX) {
|
if (pInfo->sch.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||||
val = (char*) TABLE_NAME(pTable);
|
val = (char*) TABLE_NAME(pTable);
|
||||||
} else {
|
} else {
|
||||||
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
|
val = tdGetKVRowValOfCol(pTable->tagVal, pInfo->sch.colId);
|
||||||
|
|
Loading…
Reference in New Issue