feat(stream): add num of children
This commit is contained in:
parent
f33c15fb8b
commit
8e41fc912b
|
@ -40,6 +40,7 @@ typedef struct SReadHandle {
|
|||
bool initMetaReader;
|
||||
bool initTableReader;
|
||||
bool initTqReader;
|
||||
int32_t numOfVgroups;
|
||||
} SReadHandle;
|
||||
|
||||
// in queue mode, data streams are seperated by msg
|
||||
|
|
|
@ -262,6 +262,7 @@ typedef struct SStreamTask {
|
|||
int64_t startVer;
|
||||
int64_t checkpointVer;
|
||||
int64_t processedVer;
|
||||
int32_t numOfVgroups;
|
||||
|
||||
// children info
|
||||
SArray* childEpInfo; // SArray<SStreamChildEpInfo*>
|
||||
|
|
|
@ -383,6 +383,11 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
|||
// exec
|
||||
pInnerTask->execType = TASK_EXEC__PIPE;
|
||||
|
||||
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
|
||||
ASSERT(pDbObj != NULL);
|
||||
sdbRelease(pSdb, pSourceDb);
|
||||
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
|
||||
|
||||
if (tsSchedStreamToSnode) {
|
||||
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
|
||||
if (pSnode == NULL) {
|
||||
|
|
|
@ -591,7 +591,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
} else {
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
SReadHandle mgHandle = {
|
||||
.vnode = NULL,
|
||||
.numOfVgroups = pTask->numOfVgroups,
|
||||
};
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle);
|
||||
}
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
|
|
|
@ -4424,7 +4424,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
.calTrigger = pTableScanNode->triggerType,
|
||||
.maxTs = INT64_MIN,
|
||||
};
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
int32_t code = createScanTableListInfo(&pTableScanNode->scan, pTableScanNode->pGroupTags,
|
||||
pTableScanNode->groupSort, pHandle, pTableListInfo, pTagCond, pTagIndexCond, GET_TASKID(pTaskInfo));
|
||||
if (code) {
|
||||
|
@ -4590,7 +4590,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
|||
int32_t children = 0;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) {
|
||||
int32_t children = 1;
|
||||
int32_t children = pHandle->numOfVgroups;
|
||||
pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children);
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == type) {
|
||||
pOptr = createSortOperatorInfo(ops[0], (SSortPhysiNode*)pPhyNode, pTaskInfo);
|
||||
|
|
|
@ -1539,7 +1539,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
goto _error;
|
||||
}
|
||||
|
||||
if (pHandle) {
|
||||
if (pHandle->vnode) {
|
||||
SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||
STableScanInfo* pTSInfo = (STableScanInfo*)pTableScanOp->info;
|
||||
if (pHandle->version > 0) {
|
||||
|
|
|
@ -64,6 +64,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|||
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
|
||||
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;
|
||||
|
||||
int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
|
||||
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
||||
|
@ -118,6 +119,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|||
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
|
||||
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;
|
||||
|
||||
int32_t epSz;
|
||||
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
||||
|
|
Loading…
Reference in New Issue