fix(query): remove redundant ts cols in cache scan.
This commit is contained in:
parent
6ebf6b29bd
commit
2f5ed16cb8
|
@ -37,6 +37,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||||
|
bool allNullRow = true;
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||||
|
@ -46,12 +48,15 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
p->ts = pColVal->ts;
|
p->ts = pColVal->ts;
|
||||||
p->bytes = TSDB_KEYSIZE;
|
p->bytes = TSDB_KEYSIZE;
|
||||||
*(int64_t*)p->buf = pColVal->ts;
|
*(int64_t*)p->buf = pColVal->ts;
|
||||||
|
allNullRow = false;
|
||||||
} else {
|
} else {
|
||||||
int32_t slotId = slotIds[i];
|
int32_t slotId = slotIds[i];
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||||
|
|
||||||
p->ts = pColVal->ts;
|
p->ts = pColVal->ts;
|
||||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||||
|
allNullRow = p->isNull & allNullRow;
|
||||||
|
|
||||||
if (!p->isNull) {
|
if (!p->isNull) {
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||||
|
@ -69,6 +74,8 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
|
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
|
||||||
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBlock->info.rows += allNullRow? 0:1;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
|
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
|
||||||
|
|
||||||
|
@ -96,10 +103,10 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
|
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t numOfCols, void** pReader) {
|
||||||
*pReader = NULL;
|
*pReader = NULL;
|
||||||
|
|
|
@ -889,8 +889,7 @@ typedef struct SJoinOperatorInfo {
|
||||||
void doDestroyExchangeOperatorInfo(void* param);
|
void doDestroyExchangeOperatorInfo(void* param);
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
||||||
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain);
|
||||||
__optr_decode_fn_t decode, __optr_explain_fn_t explain);
|
|
||||||
|
|
||||||
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
int32_t operatorDummyOpenFn(SOperatorInfo* pOperator);
|
||||||
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num);
|
||||||
|
|
|
@ -27,7 +27,8 @@
|
||||||
|
|
||||||
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
|
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
|
||||||
static void destroyLastrowScanOperator(void* param);
|
static void destroyLastrowScanOperator(void* param);
|
||||||
static int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds);
|
||||||
|
static SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo);
|
||||||
|
|
||||||
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SReadHandle* readHandle,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -40,12 +41,15 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->readHandle = *readHandle;
|
pInfo->readHandle = *readHandle;
|
||||||
pInfo->pRes = createResDataBlock(pScanNode->scan.node.pOutputDataBlockDesc);
|
|
||||||
|
SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc;
|
||||||
|
pInfo->pRes = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t numOfCols = 0;
|
int32_t numOfCols = 0;
|
||||||
pInfo->pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pScanNode->scan.node.pOutputDataBlockDesc,
|
SArray* pColMatchInfo = extractColMatchInfo(pScanNode->scan.pScanCols, pDescNode, &numOfCols, COL_MATCH_FROM_COL_ID);
|
||||||
&numOfCols, COL_MATCH_FROM_COL_ID);
|
pInfo->pColMatchInfo = removeRedundantTsCol(pScanNode, pColMatchInfo);
|
||||||
code = extractTargetSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
|
|
||||||
|
code = extractCacheScanSlotId(pInfo->pColMatchInfo, pTaskInfo, &pInfo->pSlotIds);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -72,11 +76,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
if (pScanNode->scan.pScanPseudoCols != NULL) {
|
||||||
SExprSupp* pPseudoExpr = &pInfo->pseudoExprSup;
|
SExprSupp* p = &pInfo->pseudoExprSup;
|
||||||
|
p->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &p->numOfExprs);
|
||||||
pPseudoExpr->pExprInfo = createExprInfo(pScanNode->scan.pScanPseudoCols, NULL, &pPseudoExpr->numOfExprs);
|
p->pCtx = createSqlFunctionCtx(p->pExprInfo, p->numOfExprs, &p->rowEntryInfoOffset);
|
||||||
pPseudoExpr->pCtx =
|
|
||||||
createSqlFunctionCtx(pPseudoExpr->pExprInfo, pPseudoExpr->numOfExprs, &pPseudoExpr->rowEntryInfoOffset);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->name = "LastrowScanOperator";
|
pOperator->name = "LastrowScanOperator";
|
||||||
|
@ -88,7 +90,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
|
||||||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, NULL, destroyLastrowScanOperator, NULL);
|
||||||
|
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -130,7 +132,9 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// check for tag values
|
// check for tag values
|
||||||
int32_t resultRows = pInfo->pBufferredRes->info.rows;
|
int32_t resultRows = pInfo->pBufferredRes->info.rows;
|
||||||
ASSERT(resultRows == taosArrayGetSize(pInfo->pUidList));
|
|
||||||
|
// the results may be null, if last values are all null
|
||||||
|
ASSERT(resultRows == 0 || resultRows == taosArrayGetSize(pInfo->pUidList));
|
||||||
pInfo->indexOfBufferedRes = 0;
|
pInfo->indexOfBufferedRes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -240,7 +244,7 @@ void destroyLastrowScanOperator(void* param) {
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
|
int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds) {
|
||||||
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
size_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
|
|
||||||
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
*pSlotIds = taosMemoryMalloc(numOfCols * sizeof(int32_t));
|
||||||
|
@ -248,16 +252,18 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSchemaWrapper* pWrapper = pTaskInfo->schemaInfo.sw;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
|
SColMatchInfo* pColMatch = taosArrayGet(pColMatchInfo, i);
|
||||||
for (int32_t j = 0; j < pTaskInfo->schemaInfo.sw->nCols; ++j) {
|
for (int32_t j = 0; j < pWrapper->nCols; ++j) {
|
||||||
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId &&
|
if (pColMatch->colId == pWrapper->pSchema[j].colId &&
|
||||||
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
pColMatch->colId == PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||||
(*pSlotIds)[pColMatch->targetSlotId] = -1;
|
(*pSlotIds)[pColMatch->targetSlotId] = -1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pColMatch->colId == pTaskInfo->schemaInfo.sw->pSchema[j].colId) {
|
if (pColMatch->colId == pWrapper->pSchema[j].colId) {
|
||||||
(*pSlotIds)[pColMatch->targetSlotId] = j;
|
(*pSlotIds)[pColMatch->targetSlotId] = j;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -266,3 +272,26 @@ int32_t extractTargetSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInf
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SArray* pColMatchInfo) {
|
||||||
|
if (!pScanNode->ignoreNull) { // retrieve cached last value
|
||||||
|
return pColMatchInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pMatchInfo = taosArrayInit(taosArrayGetSize(pColMatchInfo), sizeof(SColMatchInfo));
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pColMatchInfo); ++i) {
|
||||||
|
SColMatchInfo* pColInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
|
||||||
|
int32_t slotId = pColInfo->targetSlotId;
|
||||||
|
SNodeList* pList = pScanNode->scan.node.pOutputDataBlockDesc->pSlots;
|
||||||
|
|
||||||
|
SSlotDescNode* pDesc = (SSlotDescNode*)nodesListGetNode(pList, slotId);
|
||||||
|
if (pDesc->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
taosArrayPush(pMatchInfo, pColInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pColMatchInfo);
|
||||||
|
return pMatchInfo;
|
||||||
|
}
|
|
@ -110,16 +110,13 @@ int32_t operatorDummyOpenFn(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
||||||
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain) {
|
||||||
__optr_decode_fn_t decode, __optr_explain_fn_t explain) {
|
|
||||||
SOperatorFpSet fpSet = {
|
SOperatorFpSet fpSet = {
|
||||||
._openFn = openFn,
|
._openFn = openFn,
|
||||||
.getNextFn = nextFn,
|
.getNextFn = nextFn,
|
||||||
.getStreamResFn = streamFn,
|
.getStreamResFn = streamFn,
|
||||||
.cleanupFn = cleanup,
|
.cleanupFn = cleanup,
|
||||||
.closeFn = closeFn,
|
.closeFn = closeFn,
|
||||||
.encodeResultRow = encode,
|
|
||||||
.decodeResultRow = decode,
|
|
||||||
.getExplainFn = explain,
|
.getExplainFn = explain,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -2305,7 +2302,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, doLoadRemoteData, NULL, NULL,
|
||||||
destroyExchangeOperatorInfo, NULL, NULL, NULL);
|
destroyExchangeOperatorInfo, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -3081,7 +3078,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, NULL, destroyAggOperatorInfo,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
NULL);
|
||||||
|
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||||
STableScanInfo* pTableScanInfo = downstream->info;
|
STableScanInfo* pTableScanInfo = downstream->info;
|
||||||
|
@ -3306,7 +3303,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doFill, NULL, NULL, destroyFillOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
|
@ -443,7 +443,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashGroupbyAggregate, NULL, NULL,
|
||||||
destroyGroupOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyGroupOperatorInfo, NULL);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -820,7 +820,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, hashPartition, NULL, NULL, destroyPartitionOperatorInfo,
|
||||||
NULL, NULL, NULL);
|
NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
@ -1095,7 +1095,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamHashPartition, NULL, NULL,
|
||||||
destroyStreamPartitionOperatorInfo, NULL, NULL, NULL);
|
destroyStreamPartitionOperatorInfo, NULL);
|
||||||
|
|
||||||
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
|
@ -116,7 +116,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
}
|
}
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL);
|
||||||
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
code = appendDownstream(pOperator, pDownstream, numOfDownstream);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
|
|
@ -105,7 +105,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doProjectOperation, NULL, NULL,
|
||||||
destroyProjectOperatorInfo, NULL, NULL, NULL);
|
destroyProjectOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -405,7 +405,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doApplyIndefinitFunction, NULL, NULL,
|
||||||
destroyIndefinitOperatorInfo, NULL, NULL, NULL);
|
destroyIndefinitOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -781,7 +781,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScan, NULL, NULL, destroyTableScanOperatorInfo,
|
||||||
NULL, NULL, getTableScannerExecInfo);
|
getTableScannerExecInfo);
|
||||||
|
|
||||||
// for non-blocking operator, the open cost is always 0
|
// for non-blocking operator, the open cost is always 0
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
|
@ -809,7 +809,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo*
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableScanImpl, NULL, NULL, NULL, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -930,7 +930,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(void* dataReader, SReadHandle* re
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doBlockInfoScan, NULL, NULL,
|
||||||
destroyBlockDistScanOperatorInfo, NULL, NULL, NULL);
|
destroyBlockDistScanOperatorInfo, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
|
@ -2115,7 +2115,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL, NULL, NULL);
|
pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, NULL, destroyRawScanOperatorInfo, NULL);
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
_end:
|
_end:
|
||||||
|
@ -2295,7 +2295,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
||||||
|
|
||||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, nextFn, NULL, NULL, destroyStreamScanOperatorInfo, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -3201,7 +3201,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doSysTableScan, NULL, NULL, destroySysScanOperator, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -3338,7 +3338,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doTagScan, NULL, NULL, destroyTagScanOperatorInfo, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
@ -3878,9 +3878,8 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
initResultSizeInfo(&pOperator->resultInfo, 1024);
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL,
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTableMergeScan, NULL, NULL, destroyTableMergeScanOperatorInfo, NULL,
|
destroyTableMergeScanOperatorInfo, getTableMergeScanExplainExecInfo);
|
||||||
NULL, getTableMergeScanExplainExecInfo);
|
|
||||||
pOperator->cost.openCost = 0;
|
pOperator->cost.openCost = 0;
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -64,7 +64,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
// there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
// there are headers, so pageSize = rowSize + header pInfo->sortBufSize = pInfo->bufPageSize * 16;
|
||||||
// TODO dynamic set the available sort buffer
|
// TODO dynamic set the available sort buffer
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(doOpenSortOperator, doSort, NULL, NULL, destroyOrderOperatorInfo,
|
||||||
getExplainExecInfo);
|
getExplainExecInfo);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -515,7 +515,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doGroupSort, NULL, NULL, destroyGroupSortOperatorInfo,
|
||||||
NULL, NULL, getGroupSortExplainExecInfo);
|
getGroupSortExplainExecInfo);
|
||||||
|
|
||||||
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
int32_t code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -752,7 +752,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(doOpenMultiwayMergeOperator, doMultiwayMerge, NULL, NULL,
|
||||||
destroyMultiwayMergeOperatorInfo, NULL, NULL, getMultiwayMergeExplainExecInfo);
|
destroyMultiwayMergeOperatorInfo, getMultiwayMergeExplainExecInfo);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, downStreams, numStreams);
|
code = appendDownstream(pOperator, downStreams, numStreams);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1694,7 +1694,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamFill, NULL, NULL, destroyStreamFillOperatorInfo,
|
||||||
NULL, NULL, NULL);
|
NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
|
@ -1831,8 +1831,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
||||||
pOperator->status = OP_NOT_OPENED;
|
pOperator->status = OP_NOT_OPENED;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, NULL, destroyIntervalOperatorInfo, NULL);
|
||||||
destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2638,8 +2637,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL);
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, NULL, destroyTimeSliceOperatorInfo, NULL, NULL, NULL);
|
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
@ -2710,7 +2708,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, NULL,
|
||||||
destroyStateWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStateWindowOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2784,7 +2782,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doSessionWindowAgg, NULL, NULL,
|
||||||
destroySWindowOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroySWindowOperatorInfo, NULL);
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -3465,8 +3463,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, NULL);
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
|
||||||
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) {
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||||
}
|
}
|
||||||
|
@ -4261,7 +4258,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
|
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionAgg, NULL, NULL, destroyStreamSessionAggOperatorInfo,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
NULL);
|
||||||
if (downstream) {
|
if (downstream) {
|
||||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||||
pInfo->primaryTsIndex);
|
pInfo->primaryTsIndex);
|
||||||
|
@ -4408,7 +4405,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
pOperator->name = "StreamSessionSemiAggOperator";
|
pOperator->name = "StreamSessionSemiAggOperator";
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStreamSessionAggOperatorInfo, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->pGroupIdTbNameMap =
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
@ -4777,7 +4774,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doStreamStateAgg, NULL, NULL,
|
||||||
destroyStreamStateOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStreamStateOperatorInfo, NULL);
|
||||||
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
initDownStream(downstream, &pInfo->streamAggSup, pInfo->twAggSup.waterMark, pOperator->operatorType,
|
||||||
pInfo->primaryTsIndex);
|
pInfo->primaryTsIndex);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
@ -5054,7 +5051,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
|
||||||
pOperator->info = miaInfo;
|
pOperator->info = miaInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, mergeAlignedIntervalAgg, NULL, NULL,
|
||||||
destroyMAIOperatorInfo, NULL, NULL, NULL);
|
destroyMAIOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -5366,7 +5363,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
|
||||||
pOperator->info = pMergeIntervalInfo;
|
pOperator->info = pMergeIntervalInfo;
|
||||||
|
|
||||||
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doMergeIntervalAgg, NULL, NULL,
|
||||||
destroyMergeIntervalOperatorInfo, NULL, NULL, NULL);
|
destroyMergeIntervalOperatorInfo, NULL);
|
||||||
|
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -5599,7 +5596,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
createOperatorFpSet(operatorDummyOpenFn, doStreamIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo,
|
||||||
aggEncodeResultRow, aggDecodeResultRow, NULL);
|
NULL);
|
||||||
|
|
||||||
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup);
|
||||||
code = appendDownstream(pOperator, &downstream, 1);
|
code = appendDownstream(pOperator, &downstream, 1);
|
||||||
|
|
Loading…
Reference in New Issue