fix: memory error
This commit is contained in:
parent
131a1c13c5
commit
3ae3fd65e8
|
@ -35,19 +35,6 @@ enum {
|
||||||
STREAM_CREATED_BY__SMA,
|
STREAM_CREATED_BY__SMA,
|
||||||
};
|
};
|
||||||
|
|
||||||
#if 0
|
|
||||||
// pipe -> fetch/pipe queue
|
|
||||||
// merge -> merge queue
|
|
||||||
// write -> write queue
|
|
||||||
enum {
|
|
||||||
TASK_DISPATCH_MSG__SND_PIPE = 1,
|
|
||||||
TASK_DISPATCH_MSG__SND_MERGE,
|
|
||||||
TASK_DISPATCH_MSG__VND_PIPE,
|
|
||||||
TASK_DISPATCH_MSG__VND_MERGE,
|
|
||||||
TASK_DISPATCH_MSG__VND_WRITE,
|
|
||||||
};
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t nodeId; // 0 for snode
|
int32_t nodeId; // 0 for snode
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
|
@ -100,10 +87,6 @@ typedef struct {
|
||||||
int8_t reserved;
|
int8_t reserved;
|
||||||
} STaskSinkFetch;
|
} STaskSinkFetch;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t reserved;
|
|
||||||
} STaskSinkShow;
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TASK_SOURCE__SCAN = 1,
|
TASK_SOURCE__SCAN = 1,
|
||||||
TASK_SOURCE__PIPE,
|
TASK_SOURCE__PIPE,
|
||||||
|
@ -128,7 +111,6 @@ enum {
|
||||||
TASK_SINK__TABLE,
|
TASK_SINK__TABLE,
|
||||||
TASK_SINK__SMA,
|
TASK_SINK__SMA,
|
||||||
TASK_SINK__FETCH,
|
TASK_SINK__FETCH,
|
||||||
TASK_SINK__SHOW,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -155,7 +137,6 @@ typedef struct {
|
||||||
STaskSinkTb tbSink;
|
STaskSinkTb tbSink;
|
||||||
STaskSinkSma smaSink;
|
STaskSinkSma smaSink;
|
||||||
STaskSinkFetch fetchSink;
|
STaskSinkFetch fetchSink;
|
||||||
STaskSinkShow showSink;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// dispatch
|
// dispatch
|
||||||
|
|
|
@ -431,7 +431,7 @@ static int32_t mndRetrieveStream(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock *p
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
||||||
while (numOfRows < rows) {
|
while (numOfRows < rows) {
|
||||||
pShow->pIter = sdbFetch(pSdb, SDB_TOPIC, pShow->pIter, (void **)&pStream);
|
pShow->pIter = sdbFetch(pSdb, SDB_STREAM, pShow->pIter, (void **)&pStream);
|
||||||
if (pShow->pIter == NULL) break;
|
if (pShow->pIter == NULL) break;
|
||||||
|
|
||||||
SColumnInfoData *pColInfo;
|
SColumnInfoData *pColInfo;
|
||||||
|
|
|
@ -159,7 +159,7 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
|
||||||
|
|
||||||
// decode
|
// decode
|
||||||
pBuf = pVal;
|
pBuf = pVal;
|
||||||
pSW = taosMemoryMalloc(sizeof(pSW));
|
pSW = taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
|
||||||
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
|
tCoderInit(&coder, TD_LITTLE_ENDIAN, pVal, vLen, TD_DECODER);
|
||||||
tDecodeSSchemaWrapper(&coder, pSW);
|
tDecodeSSchemaWrapper(&coder, pSW);
|
||||||
|
|
|
@ -864,7 +864,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int32_t parallel) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SStreamTask* pTask = taosMemoryMalloc(sizeof(SStreamTask));
|
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -482,7 +482,8 @@ static SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowI
|
||||||
|
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition));
|
taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos,
|
||||||
|
sizeof(SResultRowPosition));
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. set the new time window to be the new active time window
|
// 2. set the new time window to be the new active time window
|
||||||
|
@ -1034,8 +1035,8 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam,
|
static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunctParam* pFuncParam, int32_t paramIndex,
|
||||||
int32_t paramIndex, int32_t numOfRows) {
|
int32_t numOfRows) {
|
||||||
SColumnInfoData* pColInfo = NULL;
|
SColumnInfoData* pColInfo = NULL;
|
||||||
if (pInput->pData[paramIndex] == NULL) {
|
if (pInput->pData[paramIndex] == NULL) {
|
||||||
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
pColInfo = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
|
@ -2998,9 +2999,8 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u
|
||||||
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
|
SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx;
|
||||||
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
||||||
|
|
||||||
SResultRow* pResultRow =
|
SResultRow* pResultRow = doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId,
|
||||||
doSetResultOutBufByKey(pAggInfo->aggSup.pResultBuf, pResultRowInfo, uid, (char*)&groupId, sizeof(groupId),
|
sizeof(groupId), true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
||||||
true, groupId, pTaskInfo, false, &pAggInfo->aggSup);
|
|
||||||
assert(pResultRow != NULL);
|
assert(pResultRow != NULL);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -4219,8 +4219,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock
|
||||||
pOperator->numOfOutput = pBlock->info.numOfCols;
|
pOperator->numOfOutput = pBlock->info.numOfCols;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL, destroyExchangeOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||||
NULL, NULL, NULL);
|
destroyExchangeOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
{ // todo refactor
|
{ // todo refactor
|
||||||
|
@ -4709,8 +4709,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pR
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo,
|
pOperator->fpSet =
|
||||||
NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -5008,7 +5008,8 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock)
|
||||||
// todo optimize performance
|
// todo optimize performance
|
||||||
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
|
// If there are slimit/soffset value exists, multi-round result can not be packed into one group, since the
|
||||||
// they may not belong to the same group the limit/offset value is not valid in this case.
|
// they may not belong to the same group the limit/offset value is not valid in this case.
|
||||||
if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 || pProjectInfo->slimit.limit != -1) {
|
if (pRes->info.rows >= pOperator->resultInfo.threshold || pProjectInfo->slimit.offset != -1 ||
|
||||||
|
pProjectInfo->slimit.limit != -1) {
|
||||||
return PROJECT_RETRIEVE_DONE;
|
return PROJECT_RETRIEVE_DONE;
|
||||||
} else { // not full enough, continue to accumulate the output data in the buffer.
|
} else { // not full enough, continue to accumulate the output data in the buffer.
|
||||||
return PROJECT_RETRIEVE_CONTINUE;
|
return PROJECT_RETRIEVE_CONTINUE;
|
||||||
|
@ -5725,6 +5726,10 @@ void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
|
static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInfo) {
|
||||||
|
if (pTableGroupInfo->numOfTables == 0) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
STableQueryInfo* pTableQueryInfo = taosMemoryCalloc(pTableGroupInfo->numOfTables, sizeof(STableQueryInfo));
|
||||||
if (pTableQueryInfo == NULL) {
|
if (pTableQueryInfo == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5750,7 +5755,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf
|
||||||
|
|
||||||
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo,
|
||||||
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) {
|
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
|
||||||
|
const STableGroupInfo* pTableGroupInfo) {
|
||||||
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -5764,7 +5770,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
int32_t code =
|
int32_t code =
|
||||||
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, pResultBlock, keyBufSize, pTaskInfo->id.str);
|
||||||
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS || pInfo->pTableQueryInfo == NULL) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5919,8 +5925,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* p
|
||||||
pOperator->pExpr = pExprInfo;
|
pOperator->pExpr = pExprInfo;
|
||||||
pOperator->numOfOutput = num;
|
pOperator->numOfOutput = num;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL, destroyProjectOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||||
NULL, NULL, NULL);
|
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
||||||
|
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -5977,8 +5983,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, doStreamIntervalAgg, NULL,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -5997,7 +6003,8 @@ _error:
|
||||||
|
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
STimeWindowAggSupp* pTwAggSupp, const STableGroupInfo* pTableGroupInfo,
|
||||||
|
SExecTaskInfo* pTaskInfo) {
|
||||||
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
STableIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STableIntervalOperatorInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -6035,8 +6042,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL, destroyIntervalOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doStreamIntervalAgg, doStreamIntervalAgg, NULL,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -6051,7 +6058,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExpr
|
||||||
taosMemoryFreeClear(pOperator);
|
taosMemoryFreeClear(pOperator);
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
@ -6115,8 +6121,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL, destroyStateWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStateWindowAgg, NULL, NULL,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -6161,8 +6167,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL, destroySWindowOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -6250,8 +6256,8 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
pOperator->numOfOutput = numOfCols;
|
pOperator->numOfOutput = numOfCols;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo,
|
pOperator->fpSet =
|
||||||
NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroySFillOperatorInfo, NULL, NULL, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -6262,7 +6268,6 @@ _error:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
|
static int32_t getColumnIndexInSource(SQueriedTableInfo* pTableInfo, SExprBasicInfo* pExpr, SColumnInfo* pTagCols) {
|
||||||
int32_t j = 0;
|
int32_t j = 0;
|
||||||
|
|
||||||
|
@ -6496,8 +6501,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SInterval interval = extractIntervalInfo(pTableScanNode);
|
SInterval interval = extractIntervalInfo(pTableScanNode);
|
||||||
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired, pTableScanNode->scanSeq, pColList,
|
return createTableScanOperatorInfo(pDataReader, &cond, numOfCols, pTableScanNode->dataRequired,
|
||||||
pResBlock, pScanPhyNode->node.pConditions, &interval, pTableScanNode->ratio, pTaskInfo);
|
pTableScanNode->scanSeq, pColList, pResBlock, pScanPhyNode->node.pConditions,
|
||||||
|
&interval, pTableScanNode->ratio, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pExchange->node.pOutputDataBlockDesc);
|
||||||
|
@ -6505,14 +6511,16 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||||
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
|
||||||
|
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||||
|
queryId, taskId);
|
||||||
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
SArray* tableIdList = extractTableIdList(pTableGroupInfo);
|
||||||
|
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanPhyNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
SArray* pCols = extractColMatchInfo(pScanPhyNode->pScanCols, pScanPhyNode->node.pOutputDataBlockDesc, &numOfCols);
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo);
|
SOperatorInfo* pOperator =
|
||||||
|
createStreamScanOperatorInfo(pHandle->reader, pResBlock, pCols, tableIdList, pTaskInfo);
|
||||||
taosArrayDestroy(tableIdList);
|
taosArrayDestroy(tableIdList);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
|
@ -6522,10 +6530,11 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pScanNode->node.pOutputDataBlockDesc);
|
||||||
|
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* colList = extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
SArray* colList =
|
||||||
|
extractColMatchInfo(pScanNode->pScanCols, pScanNode->node.pOutputDataBlockDesc, &numOfOutputCols);
|
||||||
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
SOperatorInfo* pOperator = createSysTableScanOperatorInfo(
|
||||||
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet,
|
pHandle, pResBlock, &pScanNode->tableName, pScanNode->node.pConditions, pSysScanPhyNode->mgmtEpSet, colList,
|
||||||
colList, pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
pTaskInfo, pSysScanPhyNode->showRewrite, pSysScanPhyNode->accountId);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
@ -6642,7 +6651,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
static int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
|
||||||
pCond->loadExternalRows = false;
|
pCond->loadExternalRows = false;
|
||||||
|
|
||||||
|
@ -6918,7 +6926,8 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
||||||
goto _complete;
|
goto _complete;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pTaskInfo)->pRoot = createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
|
(*pTaskInfo)->pRoot =
|
||||||
|
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
|
||||||
if (NULL == (*pTaskInfo)->pRoot) {
|
if (NULL == (*pTaskInfo)->pRoot) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _complete;
|
goto _complete;
|
||||||
|
@ -7256,8 +7265,8 @@ SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOf
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo,
|
pOperator->fpSet =
|
||||||
NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyBasicOperatorInfo, NULL, NULL, NULL);
|
||||||
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
int32_t code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue