fix(query): fix bug in group by tag.
This commit is contained in:
parent
bcb3d41d08
commit
87516ea332
|
@ -1515,6 +1515,11 @@ static FORCE_INLINE STSchema* doGetSchemaForTSRow(int32_t sversion, STsdbReader*
|
||||||
|
|
||||||
taosMemoryFree(pReader->pMemSchema);
|
taosMemoryFree(pReader->pMemSchema);
|
||||||
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
int32_t code = metaGetTbTSchemaEx(pReader->pTsdb->pVnode->pMeta, pReader->suid, uid, sversion, &pReader->pMemSchema);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return pReader->pMemSchema;
|
return pReader->pMemSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3732,7 +3737,7 @@ SArray* tsdbRetrieveDataBlock(STsdbReader* pReader, SArray* pIdList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) {
|
||||||
if (isEmptyQueryTimeWindow(&pReader->window)) {
|
if (isEmptyQueryTimeWindow(&pReader->window) || pReader->pReadSnap == NULL) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -616,7 +616,7 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
|
||||||
|
|
||||||
int32_t nOptrWithVal = 0;
|
int32_t nOptrWithVal = 0;
|
||||||
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
|
||||||
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
|
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) {
|
||||||
taosMemoryFreeClear(*pOutput);
|
taosMemoryFreeClear(*pOutput);
|
||||||
*len = 0;
|
*len = 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -284,6 +284,19 @@ static bool doLoadBlockSMA(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void doSetTagColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
|
||||||
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
|
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
||||||
|
|
||||||
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
||||||
|
GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||||
uint32_t* status) {
|
uint32_t* status) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -313,7 +326,6 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
pCost->skipBlocks += 1;
|
pCost->skipBlocks += 1;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
pCost->loadBlockStatis += 1;
|
pCost->loadBlockStatis += 1;
|
||||||
|
@ -322,6 +334,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
if (success) { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
qDebug("%s data block SMA loaded, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
qDebug("%s failed to load SMA, since not all columns have SMA", GET_TASKID(pTaskInfo));
|
||||||
|
@ -371,17 +384,7 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanInfo* pTableSca
|
||||||
}
|
}
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
relocateColumnData(pBlock, pTableScanInfo->pColMatchInfo, pCols, true);
|
||||||
|
doSetTagColumnData(pTableScanInfo, pBlock, pTaskInfo);
|
||||||
// currently only the tbname pseudo column
|
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
|
||||||
SExprSupp* pSup = &pTableScanInfo->pseudoSup;
|
|
||||||
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pSup->pExprInfo, pSup->numOfExprs, pBlock,
|
|
||||||
GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTableScanInfo->pFilterNode != NULL) {
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -1316,7 +1319,7 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
|
||||||
|
|
||||||
blockDataEnsureCapacity(pResBlock, 1);
|
blockDataEnsureCapacity(pResBlock, 1);
|
||||||
|
|
||||||
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, pTagCalSup->numOfExprs, NULL);
|
projectApplyFunctions(pTagCalSup->pExprInfo, pResBlock, pSrcBlock, pTagCalSup->pCtx, 1, NULL);
|
||||||
ASSERT(pResBlock->info.rows == 1);
|
ASSERT(pResBlock->info.rows == 1);
|
||||||
|
|
||||||
// build tagArray
|
// build tagArray
|
||||||
|
@ -2120,9 +2123,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
||||||
taosMemoryFree(pStreamScan->pPseudoExpr);
|
taosMemoryFree(pStreamScan->pPseudoExpr);
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
|
||||||
cleanupExprSupp(&pStreamScan->tagCalSup);
|
|
||||||
|
|
||||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||||
blockDataDestroy(pStreamScan->pRes);
|
blockDataDestroy(pStreamScan->pRes);
|
||||||
blockDataDestroy(pStreamScan->pUpdateRes);
|
blockDataDestroy(pStreamScan->pUpdateRes);
|
||||||
|
|
|
@ -2842,14 +2842,13 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols);
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc);
|
||||||
|
initBasicInfo(&pInfo->binfo, pResBlock);
|
||||||
|
|
||||||
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
initBasicInfo(&pInfo->binfo, pResBlock);
|
|
||||||
|
|
||||||
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
pInfo->twAggSup.waterMark = pSessionNode->window.watermark;
|
||||||
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
pInfo->twAggSup.calTrigger = pSessionNode->window.triggerType;
|
||||||
pInfo->gap = pSessionNode->gap;
|
pInfo->gap = pSessionNode->gap;
|
||||||
|
@ -4666,7 +4665,6 @@ void destroyStreamStateOperatorInfo(void* param) {
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||||
destroyStreamSessionAggOperatorInfo(pChInfo);
|
destroyStreamSessionAggOperatorInfo(pChInfo);
|
||||||
taosMemoryFreeClear(pChild);
|
taosMemoryFreeClear(pChild);
|
||||||
taosMemoryFreeClear(pChInfo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
|
|
@ -124,9 +124,8 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order) {
|
||||||
|
|
||||||
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
EDealRes scanPathOptHaveNormalColImpl(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
// *((bool*)pContext) =
|
*((bool*)pContext) =
|
||||||
// (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
(COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType);
|
||||||
*((bool*)pContext) = true;
|
|
||||||
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
return *((bool*)pContext) ? DEAL_RES_END : DEAL_RES_IGNORE_CHILD;
|
||||||
}
|
}
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
|
|
Loading…
Reference in New Issue