[td-2895] refactor.
This commit is contained in:
parent
4f527aef73
commit
405e201cc6
|
@ -445,9 +445,6 @@ bool doBuildResCheck(SQInfo* pQInfo);
|
|||
void setQueryStatus(SQueryRuntimeEnv *pRuntimeEnv, int8_t status);
|
||||
|
||||
bool onlyQueryTags(SQuery* pQuery);
|
||||
void buildTagQueryResult(SQInfo *pQInfo);
|
||||
void stableQueryImpl(SQInfo *pQInfo);
|
||||
void buildTableBlockDistResult(SQInfo *pQInfo);
|
||||
void tableQueryImpl(SQInfo *pQInfo);
|
||||
bool isValidQInfo(void *param);
|
||||
|
||||
|
|
|
@ -3610,35 +3610,6 @@ void queryCostStatis(SQInfo *pQInfo) {
|
|||
// pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, numOfRes, pQuery->current->lastKey);
|
||||
//}
|
||||
|
||||
static void freeTableBlockDist(STableBlockDist *pTableBlockDist) {
|
||||
if (pTableBlockDist != NULL) {
|
||||
taosArrayDestroy(pTableBlockDist->dataBlockInfos);
|
||||
free(pTableBlockDist);
|
||||
}
|
||||
}
|
||||
|
||||
#if 0
|
||||
static int32_t getPercentileFromSortedArray(const SArray* pArray, double rate) {
|
||||
int32_t len = (int32_t)taosArrayGetSize(pArray);
|
||||
if (len <= 0) {
|
||||
return 0;
|
||||
}
|
||||
assert(rate >= 0 && rate <= 1.0);
|
||||
int idx = (int32_t)((len - 1) * rate);
|
||||
return ((SDataBlockInfo *)(taosArrayGet(pArray, idx)))->rows;
|
||||
}
|
||||
|
||||
|
||||
static int compareBlockInfo(const void *pLeft, const void *pRight) {
|
||||
int32_t left = ((SDataBlockInfo *)pLeft)->rows;
|
||||
int32_t right = ((SDataBlockInfo *)pRight)->rows;
|
||||
if (left > right) return 1;
|
||||
if (left < right) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
//void skipBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||
// SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
//
|
||||
|
@ -5273,15 +5244,15 @@ SOperatorInfo* createTagScanOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SExprInf
|
|||
|
||||
void tableQueryImpl(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
// SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
// number of points returned during this query
|
||||
int64_t st = taosGetTimestampUs();
|
||||
|
||||
assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
|
||||
// assert(pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 1);
|
||||
|
||||
SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
pQuery->current = taosArrayGetP(g, 0);
|
||||
// SArray* g = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
// pQuery->current = taosArrayGetP(g, 0);
|
||||
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
|
||||
|
||||
|
@ -5289,52 +5260,6 @@ void tableQueryImpl(SQInfo *pQInfo) {
|
|||
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
|
||||
}
|
||||
|
||||
|
||||
|
||||
void buildTableBlockDistResult(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||
|
||||
STableBlockDist *pTableBlockDist = calloc(1, sizeof(STableBlockDist));
|
||||
pTableBlockDist->dataBlockInfos = taosArrayInit(512, sizeof(SFileBlockInfo));
|
||||
|
||||
TsdbQueryHandleT pQueryHandle = pRuntimeEnv->pQueryHandle;
|
||||
SSchema blockDistSchema = tGetBlockDistColumnSchema();
|
||||
|
||||
// int64_t startTime = taosGetTimestampUs();
|
||||
// tsdbGetFileBlocksDistInfo(pQueryHandle, pTableBlockDist->dataBlockInfos, );
|
||||
pTableBlockDist->numOfRowsInMemTable = tsdbGetNumOfRowsInMemTable(pQueryHandle);
|
||||
|
||||
// generateBlockDistResult(pTableBlockDist);
|
||||
|
||||
int type = -1;
|
||||
assert(pQuery->numOfOutput == 1);
|
||||
SExprInfo* pExprInfo = pQuery->pExpr1;
|
||||
for (int32_t j = 0; j < pQuery->numOfOutput; j++) {
|
||||
if (pExprInfo[j].base.colInfo.colId == TSDB_BLOCK_DIST_COLUMN_INDEX) {
|
||||
type = blockDistSchema.type;
|
||||
}
|
||||
|
||||
assert(type == TSDB_DATA_TYPE_BINARY);
|
||||
// STR_WITH_SIZE_TO_VARSTR(pQuery->sdata[j]->data, pTableBlockDist->result, (VarDataLenT)strlen(pTableBlockDist->result));
|
||||
}
|
||||
|
||||
freeTableBlockDist(pTableBlockDist);
|
||||
|
||||
// pRuntimeEnv->resultInfo.rows = 1;
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
}
|
||||
|
||||
void stableQueryImpl(SQInfo *pQInfo) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
int64_t st = taosGetTimestampUs();
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
|
||||
|
||||
// record the total elapsed time
|
||||
pQInfo->summary.elapsedTime += (taosGetTimestampUs() - st);
|
||||
}
|
||||
|
||||
static int32_t getColumnIndexInSource(SQueryTableMsg *pQueryMsg, SSqlFuncMsg *pExprMsg, SColumnInfo* pTagCols) {
|
||||
int32_t j = 0;
|
||||
|
||||
|
@ -6541,142 +6466,6 @@ static void doSetTagValueToResultBuf(char* output, const char* val, int16_t type
|
|||
}
|
||||
}
|
||||
|
||||
void buildTagQueryResult(SQInfo* pQInfo) {
|
||||
SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
|
||||
size_t numOfGroup = GET_NUM_OF_TABLEGROUP(pRuntimeEnv);
|
||||
assert(numOfGroup == 0 || numOfGroup == 1);
|
||||
|
||||
if (numOfGroup == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
|
||||
|
||||
#if 0
|
||||
SArray* pa = GET_TABLEGROUP(pRuntimeEnv, 0);
|
||||
|
||||
size_t num = taosArrayGetSize(pa);
|
||||
assert(num == pRuntimeEnv->tableqinfoGroupInfo.numOfTables);
|
||||
|
||||
int32_t count = 0;
|
||||
int32_t functionId = pQuery->pExpr1[0].base.functionId;
|
||||
if (functionId == TSDB_FUNC_TID_TAG) { // return the tags & table Id
|
||||
assert(pQuery->numOfOutput == 1);
|
||||
|
||||
SExprInfo* pExprInfo = &pQuery->pExpr1[0];
|
||||
int32_t rsize = pExprInfo->bytes;
|
||||
count = 0;
|
||||
|
||||
int16_t bytes = pExprInfo->bytes;
|
||||
int16_t type = pExprInfo->type;
|
||||
|
||||
for(int32_t i = 0; i < pQuery->numOfTags; ++i) {
|
||||
if (pQuery->tagColList[i].colId == pExprInfo->base.colInfo.colId) {
|
||||
bytes = pQuery->tagColList[i].bytes;
|
||||
type = pQuery->tagColList[i].type;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
while(pRuntimeEnv->tableIndex < num && count < pRuntimeEnv->resultInfo.capacity) {
|
||||
int32_t i = pRuntimeEnv->tableIndex++;
|
||||
STableQueryInfo *item = taosArrayGetP(pa, i);
|
||||
|
||||
char *output = NULL;//pQuery->sdata[0]->data + count * rsize;
|
||||
varDataSetLen(output, rsize - VARSTR_HEADER_SIZE);
|
||||
|
||||
output = varDataVal(output);
|
||||
STableId* id = TSDB_TABLEID(item->pTable);
|
||||
|
||||
*(int16_t *)output = 0;
|
||||
output += sizeof(int16_t);
|
||||
|
||||
*(int64_t *)output = id->uid; // memory align problem, todo serialize
|
||||
output += sizeof(id->uid);
|
||||
|
||||
*(int32_t *)output = id->tid;
|
||||
output += sizeof(id->tid);
|
||||
|
||||
*(int32_t *)output = pQuery->vgId;
|
||||
output += sizeof(pQuery->vgId);
|
||||
|
||||
if (pExprInfo->base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
char* data = tsdbGetTableName(item->pTable);
|
||||
memcpy(output, data, varDataTLen(data));
|
||||
} else {
|
||||
char* data = tsdbGetTableTagVal(item->pTable, pExprInfo->base.colInfo.colId, type, bytes);
|
||||
doSetTagValueToResultBuf(output, data, type, bytes);
|
||||
}
|
||||
|
||||
count += 1;
|
||||
}
|
||||
|
||||
qDebug("QInfo:%p create (tableId, tag) info completed, rows:%d", pQInfo, count);
|
||||
|
||||
} else if (functionId == TSDB_FUNC_COUNT) {// handle the "count(tbname)" query
|
||||
// *(int64_t*) pQuery->sdata[0]->data = num;
|
||||
|
||||
count = 1;
|
||||
SET_STABLE_QUERY_OVER(pRuntimeEnv);
|
||||
qDebug("QInfo:%p create count(tbname) query, res:%d rows:1", pQInfo, count);
|
||||
} else { // return only the tags|table name etc.
|
||||
count = 0;
|
||||
SSchema* tbnameSchema = tGetTbnameColumnSchema();
|
||||
|
||||
int32_t maxNumOfTables = (int32_t)pRuntimeEnv->resultInfo.capacity;
|
||||
if (pQuery->limit.limit >= 0 && pQuery->limit.limit < pRuntimeEnv->resultInfo.capacity) {
|
||||
maxNumOfTables = (int32_t)pQuery->limit.limit;
|
||||
}
|
||||
|
||||
while(pRuntimeEnv->tableIndex < num && count < maxNumOfTables) {
|
||||
int32_t i = pRuntimeEnv->tableIndex++;
|
||||
|
||||
// discard current result due to offset
|
||||
if (pQuery->limit.offset > 0) {
|
||||
pQuery->limit.offset -= 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
SExprInfo* pExprInfo = pQuery->pExpr1;
|
||||
STableQueryInfo* item = taosArrayGetP(pa, i);
|
||||
|
||||
char *data = NULL, *dst = NULL;
|
||||
int16_t type = 0, bytes = 0;
|
||||
for(int32_t j = 0; j < pQuery->numOfOutput; ++j) {
|
||||
// not assign value in case of user defined constant output column
|
||||
if (TSDB_COL_IS_UD_COL(pExprInfo[j].base.colInfo.flag)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pExprInfo[j].base.colInfo.colId == TSDB_TBNAME_COLUMN_INDEX) {
|
||||
bytes = tbnameSchema->bytes;
|
||||
type = tbnameSchema->type;
|
||||
|
||||
data = tsdbGetTableName(item->pTable);
|
||||
// dst = pQuery->sdata[j]->data + count * tbnameSchema->bytes;
|
||||
} else {
|
||||
type = pExprInfo[j].type;
|
||||
bytes = pExprInfo[j].bytes;
|
||||
|
||||
data = tsdbGetTableTagVal(item->pTable, pExprInfo[j].base.colInfo.colId, type, bytes);
|
||||
// dst = pQuery->sdata[j]->data + count * pExprInfo[j].bytes;
|
||||
|
||||
}
|
||||
|
||||
doSetTagValueToResultBuf(dst, data, type, bytes);
|
||||
}
|
||||
count += 1;
|
||||
}
|
||||
|
||||
qDebug("QInfo:%p create tag values results completed, rows:%d", pQInfo, count);
|
||||
}
|
||||
|
||||
pRuntimeEnv->resultInfo.rows = count;
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
#endif
|
||||
}
|
||||
|
||||
static int64_t getQuerySupportBufSize(size_t numOfTables) {
|
||||
size_t s1 = sizeof(STableQueryInfo);
|
||||
size_t s2 = sizeof(SHashNode);
|
||||
|
|
|
@ -209,6 +209,7 @@ bool qTableQuery(qinfo_t qinfo) {
|
|||
return false;
|
||||
}
|
||||
|
||||
|
||||
pQInfo->startExecTs = taosGetTimestampSec();
|
||||
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
|
@ -216,9 +217,10 @@ bool qTableQuery(qinfo_t qinfo) {
|
|||
return doBuildResCheck(pQInfo);
|
||||
}
|
||||
|
||||
if (pQInfo->runtimeEnv.tableqinfoGroupInfo.numOfTables == 0) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
if (pRuntimeEnv->tableqinfoGroupInfo.numOfTables == 0) {
|
||||
qDebug("QInfo:%p no table exists for query, abort", pQInfo);
|
||||
setQueryStatus(&pQInfo->runtimeEnv, QUERY_COMPLETED);
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
return doBuildResCheck(pQInfo);
|
||||
}
|
||||
|
||||
|
@ -232,17 +234,7 @@ bool qTableQuery(qinfo_t qinfo) {
|
|||
|
||||
qDebug("QInfo:%p query task is launched", pQInfo);
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = &pQInfo->runtimeEnv;
|
||||
if (onlyQueryTags(pQInfo->runtimeEnv.pQuery)) {
|
||||
assert(pQInfo->runtimeEnv.pQueryHandle == NULL);
|
||||
buildTagQueryResult(pQInfo);
|
||||
} else if (pQInfo->query.stableQuery) {
|
||||
stableQueryImpl(pQInfo);
|
||||
// } else if (pQInfo->query.queryBlockDist){
|
||||
// buildTableBlockDistResult(pQInfo);
|
||||
} else {
|
||||
tableQueryImpl(pQInfo);
|
||||
}
|
||||
pRuntimeEnv->outputBuf = pRuntimeEnv->proot->exec(pRuntimeEnv->proot);
|
||||
|
||||
if (isQueryKilled(pQInfo)) {
|
||||
qDebug("QInfo:%p query is killed", pQInfo);
|
||||
|
|
|
@ -784,7 +784,15 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select count(id) from st1
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue