enh(query): opt group by tag perf.
This commit is contained in:
parent
d82bad5c64
commit
aa04aee7fd
|
@ -284,6 +284,18 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
uint32_t* status) {
|
uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -313,6 +325,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
pCost->skipBlocks += 1;
|
pCost->skipBlocks += 1;
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
|
@ -320,6 +333,8 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
loadSMA = true; // mark the operation of load sma;
|
loadSMA = true; // mark the operation of load sma;
|
||||||
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
bool success = doLoadBlockSMA(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
|
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -373,15 +388,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTableScanInfo->pFilterNode != NULL) {
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1311,15 +1318,10 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
|
||||||
|
|
||||||
blockDataEnsureCapacity(pResBlock, 1);
|
blockDataEnsureCapacity(pResBlock, 1);
|
||||||
|
|
||||||
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL);
|
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
|
||||||
ASSERT(pResBlock->info.rows == 1);
|
ASSERT(pResBlock->info.rows == 1);
|
||||||
|
|
||||||
// build tagArray
|
// build tagArray
|
||||||
/*SArray* tagArray = taosArrayInit(0, sizeof(void*));*/
|
|
||||||
/*STagVal tagVal = {*/
|
|
||||||
/*.cid = 0,*/
|
|
||||||
/*.type = 0,*/
|
|
||||||
/*};*/
|
|
||||||
// build STag
|
// build STag
|
||||||
// set STag
|
// set STag
|
||||||
|
|
||||||
|
@ -2115,9 +2117,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
||||||
taosMemoryFree(pStreamScan->pPseudoExpr);
|
taosMemoryFree(pStreamScan->pPseudoExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
|
||||||
cleanupExprSupp(&pStreamScan->tagCalSup);
|
|
||||||
|
|
||||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||||
blockDataDestroy(pStreamScan->pRes);
|
blockDataDestroy(pStreamScan->pRes);
|
||||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||||
|
@ -2172,19 +2171,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTableScanNode->pTags != NULL) {
|
|
||||||
int32_t numOfTags;
|
|
||||||
SExprInfo* pTagExpr = createExprInfo(pTableScanNode->pTags, NULL, &numOfTags);
|
|
||||||
if (pTagExpr == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
if (initExprSupp(&pInfo->tagCalSup, pTagExpr, numOfTags) != 0) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
pInfo->pBlockLists = taosArrayInit(4, POINTER_BYTES);
|
||||||
if (pInfo->pBlockLists == NULL) {
|
if (pInfo->pBlockLists == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -6169,99 +6169,6 @@ int32_t groupKeyFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return pResInfo->numOfRes;
|
return pResInfo->numOfRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t interpFunction(SqlFunctionCtx* pCtx) {
|
|
||||||
#if 0
|
|
||||||
int32_t fillType = (int32_t) pCtx->param[2].i64;
|
|
||||||
//bool ascQuery = (pCtx->order == TSDB_ORDER_ASC);
|
|
||||||
|
|
||||||
if (pCtx->start.key == pCtx->startTs) {
|
|
||||||
assert(pCtx->start.key != INT64_MIN);
|
|
||||||
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
|
|
||||||
|
|
||||||
goto interp_success_exit;
|
|
||||||
} else if (pCtx->end.key == pCtx->startTs && pCtx->end.key != INT64_MIN && fillType == TSDB_FILL_NEXT) {
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
|
|
||||||
|
|
||||||
goto interp_success_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (fillType) {
|
|
||||||
case TSDB_FILL_NULL:
|
|
||||||
setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_SET_VALUE:
|
|
||||||
tVariantDump(&pCtx->param[1], pCtx->pOutput, pCtx->inputType, true);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_LINEAR:
|
|
||||||
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs
|
|
||||||
|| pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
|
||||||
goto interp_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
double v1 = -1, v2 = -1;
|
|
||||||
GET_TYPED_DATA(v1, double, pCtx->inputType, &pCtx->start.val);
|
|
||||||
GET_TYPED_DATA(v2, double, pCtx->inputType, &pCtx->end.val);
|
|
||||||
|
|
||||||
SPoint point1 = {.key = pCtx->start.key, .val = &v1};
|
|
||||||
SPoint point2 = {.key = pCtx->end.key, .val = &v2};
|
|
||||||
SPoint point = {.key = pCtx->startTs, .val = pCtx->pOutput};
|
|
||||||
|
|
||||||
int32_t srcType = pCtx->inputType;
|
|
||||||
if (isNull((char *)&pCtx->start.val, srcType) || isNull((char *)&pCtx->end.val, srcType)) {
|
|
||||||
setNull(pCtx->pOutput, srcType, pCtx->inputBytes);
|
|
||||||
} else {
|
|
||||||
bool exceedMax = false, exceedMin = false;
|
|
||||||
taosGetLinearInterpolationVal(&point, pCtx->outputType, &point1, &point2, TSDB_DATA_TYPE_DOUBLE, &exceedMax, &exceedMin);
|
|
||||||
if (exceedMax || exceedMin) {
|
|
||||||
__compar_fn_t func = getComparFunc((int32_t)pCtx->inputType, 0);
|
|
||||||
if (func(&pCtx->start.val, &pCtx->end.val) <= 0) {
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->start.val : &pCtx->end.val);
|
|
||||||
} else {
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, exceedMax ? &pCtx->end.val : &pCtx->start.val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_PREV:
|
|
||||||
if (pCtx->start.key == INT64_MIN || pCtx->start.key > pCtx->startTs) {
|
|
||||||
goto interp_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->start.val);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_NEXT:
|
|
||||||
if (pCtx->end.key == INT64_MIN || pCtx->end.key < pCtx->startTs) {
|
|
||||||
goto interp_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
COPY_TYPED_DATA(pCtx->pOutput, pCtx->inputType, &pCtx->end.val);
|
|
||||||
break;
|
|
||||||
|
|
||||||
case TSDB_FILL_NONE:
|
|
||||||
// do nothing
|
|
||||||
default:
|
|
||||||
goto interp_exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
interp_success_exit:
|
|
||||||
*(TSKEY*)pCtx->ptsOutputBuf = pCtx->startTs;
|
|
||||||
INC_INIT_VAL(pCtx, 1);
|
|
||||||
|
|
||||||
interp_exit:
|
|
||||||
pCtx->start.key = INT64_MIN;
|
|
||||||
pCtx->end.key = INT64_MIN;
|
|
||||||
pCtx->endTs = pCtx->startTs;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) {
|
||||||
int32_t numOfElems = 0;
|
int32_t numOfElems = 0;
|
||||||
|
|
||||||
|
|
|
@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
|
||||||
|
|
||||||
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
// *((bool*)pContext) =
|
*((bool*)pContext) =
|
||||||
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
(COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
||||||
*((bool*)pContext) = true;
|
|
||||||
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
|
Loading…
Reference in New Issue