Merge pull request #15792 from taosdata/feature/TD-18180
fix(stream):set num of stream session child
This commit is contained in:
commit
73aa96e167
|
@ -107,6 +107,7 @@ typedef struct SDataBlockInfo {
|
||||||
int32_t childId; // used for stream, do not serialize
|
int32_t childId; // used for stream, do not serialize
|
||||||
EStreamType type; // used for stream, do not serialize
|
EStreamType type; // used for stream, do not serialize
|
||||||
STimeWindow calWin; // used for stream, do not serialize
|
STimeWindow calWin; // used for stream, do not serialize
|
||||||
|
TSKEY watermark;// used for stream
|
||||||
} SDataBlockInfo;
|
} SDataBlockInfo;
|
||||||
|
|
||||||
typedef struct SSDataBlock {
|
typedef struct SSDataBlock {
|
||||||
|
|
|
@ -1369,6 +1369,7 @@ typedef struct {
|
||||||
int64_t skey;
|
int64_t skey;
|
||||||
int64_t ekey;
|
int64_t ekey;
|
||||||
int64_t version; // for stream
|
int64_t version; // for stream
|
||||||
|
TSKEY watermark;// for stream
|
||||||
char data[];
|
char data[];
|
||||||
} SRetrieveTableRsp;
|
} SRetrieveTableRsp;
|
||||||
|
|
||||||
|
|
|
@ -1272,8 +1272,7 @@ int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
colDataAssign(pDst, pSrc, src->info.rows, &src->info);
|
||||||
}
|
}
|
||||||
|
|
||||||
dst->info.rows = src->info.rows;
|
dst->info = src->info;
|
||||||
dst->info.capacity = src->info.rows;
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -191,6 +191,7 @@ SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) {
|
||||||
pBlock->info.blockId = pNode->dataBlockId;
|
pBlock->info.blockId = pNode->dataBlockId;
|
||||||
pBlock->info.type = STREAM_INVALID;
|
pBlock->info.type = STREAM_INVALID;
|
||||||
pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
pBlock->info.calWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
pBlock->info.watermark = INT64_MIN;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
|
SSlotDescNode* pDescNode = (SSlotDescNode*)nodesListGetNode(pNode->pSlots, i);
|
||||||
|
|
|
@ -4180,7 +4180,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t children = 0;
|
int32_t children = 0;
|
||||||
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) {
|
||||||
int32_t children = 1;
|
int32_t children = pHandle->numOfVgroups;
|
||||||
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) {
|
||||||
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
|
|
|
@ -1752,30 +1752,11 @@ void increaseTs(SqlFunctionCtx* pCtx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* createDeleteBlock() {
|
|
||||||
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
|
|
||||||
pBlock->info.hasVarCol = false;
|
|
||||||
pBlock->info.groupId = 0;
|
|
||||||
pBlock->info.rows = 0;
|
|
||||||
pBlock->info.type = STREAM_DELETE_RESULT;
|
|
||||||
pBlock->info.rowSize = sizeof(TSKEY) + sizeof(uint64_t);
|
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(2, sizeof(SColumnInfoData));
|
|
||||||
SColumnInfoData infoData = {0};
|
|
||||||
infoData.info.type = TSDB_DATA_TYPE_TIMESTAMP;
|
|
||||||
infoData.info.bytes = sizeof(TSKEY);
|
|
||||||
// window start ts
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
|
||||||
|
|
||||||
infoData.info.type = TSDB_DATA_TYPE_UBIGINT;
|
|
||||||
infoData.info.bytes = sizeof(uint64_t);
|
|
||||||
taosArrayPush(pBlock->pDataBlock, &infoData);
|
|
||||||
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
|
void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) {
|
||||||
ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN);
|
if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||||
|
// Todo(liuyao) support partition by column
|
||||||
|
return;
|
||||||
|
}
|
||||||
SStreamScanInfo* pScanInfo = downstream->info;
|
SStreamScanInfo* pScanInfo = downstream->info;
|
||||||
pScanInfo->sessionSup.parentType = type;
|
pScanInfo->sessionSup.parentType = type;
|
||||||
pScanInfo->sessionSup.pIntervalAggSup = pSup;
|
pScanInfo->sessionSup.pIntervalAggSup = pSup;
|
||||||
|
@ -2872,13 +2853,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
// doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
|
||||||
// if (pInfo->pPullDataRes->info.rows != 0) {
|
|
||||||
// // process the rest of the data
|
|
||||||
// ASSERT(IS_FINAL_OP(pInfo));
|
|
||||||
// printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
|
||||||
// return pInfo->pPullDataRes;
|
|
||||||
// }
|
|
||||||
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
doBuildDeleteResult(pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes);
|
||||||
if (pInfo->pDelRes->info.rows != 0) {
|
if (pInfo->pDelRes->info.rows != 0) {
|
||||||
// process the rest of the data
|
// process the rest of the data
|
||||||
|
@ -2898,6 +2872,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||||
|
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA ||
|
||||||
pBlock->info.type == STREAM_INVALID) {
|
pBlock->info.type == STREAM_INVALID) {
|
||||||
|
@ -2986,6 +2961,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
closeIntervalWindow(pInfo->aggSup.pResultRowHashTable, &pInfo->twAggSup, &pInfo->interval, pInfo->pPullDataMap,
|
||||||
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
pUpdated, pInfo->pRecycledPages, pInfo->aggSup.pResultBuf);
|
||||||
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
closeChildIntervalWindow(pInfo->pChildren, pInfo->twAggSup.maxTs);
|
||||||
|
} else {
|
||||||
|
pInfo->binfo.pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||||
}
|
}
|
||||||
|
|
||||||
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->aggSup.pResultBuf, pUpdated, pSup->rowEntryInfoOffset);
|
||||||
|
@ -3020,7 +2997,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
// ASSERT(false);
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3032,6 +3008,7 @@ SSDataBlock* createSpecialDataBlock(EStreamType type) {
|
||||||
pBlock->info.type = type;
|
pBlock->info.type = type;
|
||||||
pBlock->info.rowSize =
|
pBlock->info.rowSize =
|
||||||
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
sizeof(TSKEY) + sizeof(TSKEY) + sizeof(uint64_t) + sizeof(uint64_t) + sizeof(TSKEY) + sizeof(TSKEY);
|
||||||
|
pBlock->info.watermark = INT64_MIN;
|
||||||
|
|
||||||
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
pBlock->pDataBlock = taosArrayInit(6, sizeof(SColumnInfoData));
|
||||||
SColumnInfoData infoData = {0};
|
SColumnInfoData infoData = {0};
|
||||||
|
@ -3221,7 +3198,6 @@ void destroyStreamSessionAggOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
|
||||||
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
|
destroyStreamSessionAggOperatorInfo(pChInfo, numOfOutput);
|
||||||
taosMemoryFreeClear(pChild);
|
taosMemoryFreeClear(pChild);
|
||||||
taosMemoryFreeClear(pChInfo);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||||
|
@ -3986,6 +3962,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
|
doStreamSessionAggImpl(pChildOp, pBlock, NULL, NULL, true);
|
||||||
}
|
}
|
||||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||||
|
maxTs = TMAX(maxTs, pBlock->info.watermark);
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
|
@ -4109,6 +4086,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||||
|
pBInfo->pRes->info.watermark = pInfo->twAggSup.maxTs;
|
||||||
// restore the value
|
// restore the value
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
// semi operator
|
// semi operator
|
||||||
|
|
|
@ -35,6 +35,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock
|
||||||
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||||
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||||
pDataBlock->info.version = be64toh(pRetrieve->version);
|
pDataBlock->info.version = be64toh(pRetrieve->version);
|
||||||
|
pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
|
||||||
|
|
||||||
pDataBlock->info.type = pRetrieve->streamBlockType;
|
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||||
pDataBlock->info.childId = pReq->upstreamChildId;
|
pDataBlock->info.childId = pReq->upstreamChildId;
|
||||||
|
|
|
@ -184,6 +184,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis
|
||||||
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
pRetrieve->skey = htobe64(pBlock->info.window.skey);
|
||||||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
pRetrieve->version = htobe64(pBlock->info.version);
|
pRetrieve->version = htobe64(pBlock->info.version);
|
||||||
|
pRetrieve->watermark = htobe64(pBlock->info.watermark);
|
||||||
|
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
pRetrieve->numOfCols = htonl(numOfCols);
|
pRetrieve->numOfCols = htonl(numOfCols);
|
||||||
|
|
Loading…
Reference in New Issue