refactor: do some internal refactor.
This commit is contained in:
parent
a0534ee715
commit
075b5e9481
|
@ -253,6 +253,25 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
|||
return status;
|
||||
}
|
||||
|
||||
static int32_t buildDispatchRsp(const SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t status, void** pBuf) {
|
||||
*pBuf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
if (*pBuf == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)(*pBuf))->vgId = htonl(pReq->upstreamNodeId);
|
||||
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT((*pBuf), sizeof(SMsgHead));
|
||||
|
||||
pDispatchRsp->inputStatus = status;
|
||||
pDispatchRsp->streamId = htobe64(pReq->streamId);
|
||||
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
||||
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) {
|
||||
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
||||
|
|
Loading…
Reference in New Issue