Merge pull request #19033 from taosdata/fix/stream_partition_name
fix(query): projection for null input
This commit is contained in:
commit
7c3d8e4d95
|
@ -890,35 +890,35 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
|
||||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL ||
|
||||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) {
|
||||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
|
||||||
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
|
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
|
||||||
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
|
||||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
|
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
|
||||||
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
qInfo("restore stream param for interval: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
||||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
|
||||||
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
|
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
|
||||||
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
|
||||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
|
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
|
||||||
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
qInfo("restore stream param for session: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
||||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);*/
|
||||||
ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);
|
/*ASSERT(pInfo->twAggSup.deleteMark == INT64_MAX);*/
|
||||||
|
|
||||||
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
pInfo->twAggSup.calTrigger = pInfo->twAggSup.calTriggerSaved;
|
||||||
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
pInfo->twAggSup.deleteMark = pInfo->twAggSup.deleteMarkSaved;
|
||||||
ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||
|
/*ASSERT(pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE ||*/
|
||||||
pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);
|
/*pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE);*/
|
||||||
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
qInfo("restore stream param for state: %d, %" PRId64, pInfo->twAggSup.calTrigger, pInfo->twAggSup.deleteMark);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -926,7 +926,7 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
|
||||||
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
if (pOperator->numOfDownstream != 1 || pOperator->pDownstream[0] == NULL) {
|
||||||
if (pOperator->numOfDownstream > 1) {
|
if (pOperator->numOfDownstream > 1) {
|
||||||
qError("unexpected stream, multiple downstream");
|
qError("unexpected stream, multiple downstream");
|
||||||
ASSERT(0);
|
/*ASSERT(0);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "filter.h"
|
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
|
#include "filter.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
typedef struct SProjectOperatorInfo {
|
typedef struct SProjectOperatorInfo {
|
||||||
|
@ -117,9 +117,10 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
|
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pOperator->exprSupp.pCtx, numOfCols);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo,
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL,
|
pTaskInfo);
|
||||||
destroyProjectOperatorInfo, optrDefaultBufFn, NULL);
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo,
|
||||||
|
optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -414,8 +415,10 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
pInfo->binfo.pRes = pResBlock;
|
pInfo->binfo.pRes = pResBlock;
|
||||||
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
pInfo->pPseudoColInfo = setRowTsColumnOutputInfo(pSup->pCtx, numOfExpr);
|
||||||
|
|
||||||
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo,
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, optrDefaultBufFn, NULL);
|
pTaskInfo);
|
||||||
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo,
|
||||||
|
optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -697,13 +700,30 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc
|
||||||
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
if (pExpr[k].pExpr->nodeType == QUERY_NODE_COLUMN) { // it is a project query
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
if (pResult->info.rows > 0 && !createNewColModel) {
|
if (pResult->info.rows > 0 && !createNewColModel) {
|
||||||
|
if (pInputData->pData[0] == NULL) {
|
||||||
|
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInput,
|
||||||
|
pSrcBlock->info.rows);
|
||||||
|
} else {
|
||||||
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
|
colDataMergeCol(pColInfoData, pResult->info.rows, (int32_t*)&pResult->info.capacity, pInputData->pData[0],
|
||||||
pInputData->numOfRows);
|
pInputData->numOfRows);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pInputData->pData[0] == NULL) {
|
||||||
|
int32_t slotId = pfCtx->param[0].pCol->slotId;
|
||||||
|
|
||||||
|
SColumnInfoData* pInput = taosArrayGet(pSrcBlock->pDataBlock, slotId);
|
||||||
|
colDataAssign(pColInfoData, pInput, pSrcBlock->info.rows, &pResult->info);
|
||||||
|
|
||||||
|
numOfRows = pSrcBlock->info.rows;
|
||||||
} else {
|
} else {
|
||||||
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
colDataAssign(pColInfoData, pInputData->pData[0], pInputData->numOfRows, &pResult->info);
|
||||||
}
|
|
||||||
|
|
||||||
numOfRows = pInputData->numOfRows;
|
numOfRows = pInputData->numOfRows;
|
||||||
|
}
|
||||||
|
}
|
||||||
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
} else if (pExpr[k].pExpr->nodeType == QUERY_NODE_VALUE) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
SColumnInfoData* pColInfoData = taosArrayGet(pResult->pDataBlock, outputSlotId);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue