refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-08-12 18:47:01 +08:00
parent 075b5e9481
commit 75e6fb0f16
3 changed files with 13 additions and 3 deletions

View File

@ -403,7 +403,7 @@ typedef struct {
int64_t streamId; int64_t streamId;
int32_t type; int32_t type;
int32_t taskId; int32_t taskId;
int32_t dataSrcVgId; int32_t srcVgId;
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamChildId; int32_t upstreamChildId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
@ -582,6 +582,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
int32_t streamProcessRunReq(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask);
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);

View File

@ -145,7 +145,7 @@ int32_t streamSchedExec(SStreamTask* pTask) {
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int8_t status = 0; int8_t status = 0;
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->srcVgId);
if (pBlock == NULL) { if (pBlock == NULL) {
streamTaskInputFail(pTask); streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED; status = TASK_INPUT_STATUS__FAILED;
@ -235,6 +235,8 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
return 0; return 0;
} }
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) { static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
int8_t status = 0; int8_t status = 0;
@ -272,6 +274,13 @@ static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchR
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);

View File

@ -36,7 +36,7 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;