fix(stream):set num of stream session child
This commit is contained in:
parent
bd1b78234f
commit
214d3609b2
|
@ -107,6 +107,7 @@ typedef struct SDataBlockInfo {
|
|||
int32_t childId; // used for stream, do not serialize
|
||||
EStreamType type; // used for stream, do not serialize
|
||||
STimeWindow calWin; // used for stream, do not serialize
|
||||
TSKEY watermark;// used for stream
|
||||
} SDataBlockInfo;
|
||||
|
||||
typedef struct SSDataBlock {
|
||||
|
|
|
@ -1369,6 +1369,7 @@ typedef struct {
|
|||
int64_t skey;
|
||||
int64_t ekey;
|
||||
int64_t version; // for stream
|
||||
TSKEY watermark;// for stream
|
||||
char data[];
|
||||
} SRetrieveTableRsp;
|
||||
|
||||
|
|
|
@ -1272,8 +1272,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
|||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||
}
|
||||
|
||||
dst->info.rows = src->info.rows;
|
||||
dst->info.capacity = src->info.rows;
|
||||
dst->info = src->info;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
|||
pBlock->info.blockId = pNode->dataBlockId;
|
||||
pBlock->info.type = STREAM_INVALID;
|
||||
pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||
pBlock->info.watermark = INT64_MIN;
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
|
||||
|
|
|
@ -4180,7 +4180,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t children = 0;
|
||||
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
|
||||
int32_t children = 1;
|
||||
int32_t children = pHandle->numOfVgroups;
|
||||
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
||||
|
|
|
@ -1752,30 +1752,11 @@ void increaseTs(SqlFunctionCtx* pCtx) {
|
|||
}
|
||||
}
|
||||
|
||||
SSDataBlock* createDeleteBlock() {
|
||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
||||
pBlock->info.hasVarCol = false;
|
||||
pBlock->info.groupId = 0;
|
||||
pBlock->info.rows = 0;
|
||||
pBlock->info.type = STREAM_DELETE_RESULT;
|
||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(uint64_t);
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(2, sizeof(SColumnInfoData));
|
||||
SColumnInfoData infoData = {0};
|
||||
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||
infoData.info.bytes = sizeof(TSKEY);
|
||||
// window start ts
|
||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||
|
||||
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
||||
infoData.info.bytes = sizeof(uint64_t);
|
||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
||||
|
||||
return pBlock;
|
||||
}
|
||||
|
||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
|
||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
||||
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
// Todo(liuyao) support partition by column
|
||||
return;
|
||||
}
|
||||
SStreamScanInfo* pScanInfo = downstream->info;
|
||||
pScanInfo->sessionSup.parentType = type;
|
||||
pScanInfo->sessionSup.pIntervalAggSup = pSup;
|
||||
|
@ -2872,13 +2853,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
// process the rest of the data
|
||||
return pInfo->pUpdateRes;
|
||||
}
|
||||
// doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
// if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// // process the rest of the data
|
||||
// ASSERT(IS_FINAL_OP(pInfo));
|
||||
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
// return pInfo->pPullDataRes;
|
||||
// }
|
||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||
if (pInfo->pDelRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
|
@ -2898,6 +2872,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
||||
pBlock->info.type == STREAM_INVALID) {
|
||||
|
@ -2986,6 +2961,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
||||
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||
} else {
|
||||
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||
}
|
||||
|
||||
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
||||
|
@ -3020,7 +2997,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
return pInfo->pDelRes;
|
||||
}
|
||||
// ASSERT(false);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -3032,6 +3008,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
|
|||
pBlock->info.type = type;
|
||||
pBlock->info.rowSize =
|
||||
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
||||
pBlock->info.watermark = INT64_MIN;
|
||||
|
||||
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
||||
SColumnInfoData infoData = {0};
|
||||
|
@ -3221,7 +3198,6 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
|||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
|
||||
taosMemoryFreeClear(pChild);
|
||||
taosMemoryFreeClear(pChInfo);
|
||||
}
|
||||
}
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
|
@ -3986,6 +3962,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
|
||||
}
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||
}
|
||||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
|
@ -4109,6 +4086,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
// semi operator
|
||||
|
|
|
@ -35,6 +35,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
|
|||
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||
pDataBlock->info.version = be64toh(pRetrieve->version);
|
||||
pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
|
||||
|
||||
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||
pDataBlock->info.childId = pReq->upstreamChildId;
|
||||
|
|
|
@ -184,6 +184,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
|||
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||
pRetrieve->version = htobe64(pBlock->info.version);
|
||||
pRetrieve->watermark = htobe64(pBlock->info.watermark);
|
||||
|
||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||
pRetrieve->numOfCols = htonl(numOfCols);
|
||||
|
|
Loading…
Reference in New Issue