fix:send stream retrieve msg to source task
This commit is contained in:
parent
e16000a50c
commit
de8ce3f7ec
|
@ -743,7 +743,6 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
|
|||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||
|
||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||
|
@ -751,7 +750,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
|||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
|
||||
SStreamChildEpInfo* streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask);
|
||||
|
@ -868,6 +867,9 @@ int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInf
|
|||
|
||||
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
||||
void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
||||
|
||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req);
|
||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -331,15 +331,25 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
if (pTask == NULL) {
|
||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||
req.dstTaskId);
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
if(pTask->info.taskLevel == TASK_LEVEL__SOURCE){
|
||||
code = streamProcessRetrieveReq(pTask, &req);
|
||||
}else{
|
||||
req.srcNodeId = pTask->info.nodeId;
|
||||
req.srcTaskId = pTask->id.taskId;
|
||||
code = broadcastRetrieveMsg(pTask, &req);
|
||||
}
|
||||
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
streamProcessRetrieveReq(pTask, &req, &rsp);
|
||||
sendRetrieveRsp(&req, &rsp);
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
tDeleteStreamRetrieveReq(&req);
|
||||
return 0;
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tqStreamTaskProcessScanHistoryFinishReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
|
|
|
@ -104,7 +104,7 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
|
|||
void destroyStreamDataBlock(SStreamDataBlock* pBlock);
|
||||
|
||||
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
|
||||
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
|
||||
|
||||
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
|||
return status;
|
||||
}
|
||||
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock));
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
||||
|
@ -194,17 +194,6 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
|||
/*status = TASK_INPUT_STATUS__FAILED;*/
|
||||
}
|
||||
|
||||
// rsp by input status
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->rspToTaskId = pReq->srcTaskId;
|
||||
pCont->rspFromTaskId = pReq->dstTaskId;
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
|
||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
|
@ -269,11 +258,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
streamTaskEnqueueRetrieve(pTask, pReq, pRsp);
|
||||
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SINK);
|
||||
streamSchedExec(pTask);
|
||||
return 0;
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq) {
|
||||
int32_t code = streamTaskEnqueueRetrieve(pTask, pReq);
|
||||
if(code != 0){
|
||||
return code;
|
||||
}
|
||||
return streamSchedExec(pTask);
|
||||
}
|
||||
|
||||
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputq.status, TASK_INPUT_STATUS__FAILED); }
|
||||
|
|
|
@ -162,16 +162,71 @@ int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq) { taosMemoryFree(pReq->pRetrieve); }
|
||||
void sendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp){
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->srcNodeId);
|
||||
SStreamRetrieveRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
pCont->streamId = pReq->streamId;
|
||||
pCont->rspToTaskId = pReq->srcTaskId;
|
||||
pCont->rspFromTaskId = pReq->dstTaskId;
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamRetrieveRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
}
|
||||
|
||||
int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) {
|
||||
int32_t code = -1;
|
||||
int32_t broadcastRetrieveMsg(SStreamTask* pTask, SStreamRetrieveReq *req){
|
||||
int32_t code = 0;
|
||||
void* buf = NULL;
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req->reqId = tGenIdPI64();
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
req->dstNodeId = pEpInfo->nodeId;
|
||||
req->dstTaskId = pEpInfo->taskId;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeStreamRetrieveReq, req, len, code);
|
||||
if (code != 0) {
|
||||
ASSERT(0);
|
||||
return code;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeStreamRetrieveReq(&encoder, req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
|
||||
|
||||
code = tmsgSendReq(&pEpInfo->epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
ASSERT(0);
|
||||
rpcFreeCont(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
buf = NULL;
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req->reqId);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t buildStreamRetrieveReq(SStreamTask* pTask, const SSDataBlock* pBlock, SStreamRetrieveReq* req){
|
||||
SRetrieveTableRsp* pRetrieve = NULL;
|
||||
void* buf = NULL;
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock);
|
||||
|
||||
pRetrieve = taosMemoryCalloc(1, dataStrLen);
|
||||
if (pRetrieve == NULL) return -1;
|
||||
if (pRetrieve == NULL) return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
pRetrieve->useconds = 0;
|
||||
|
@ -187,57 +242,24 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
|||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||
|
||||
SStreamRetrieveReq req = {
|
||||
.streamId = pTask->id.streamId,
|
||||
.srcNodeId = pTask->info.nodeId,
|
||||
.srcTaskId = pTask->id.taskId,
|
||||
.pRetrieve = pRetrieve,
|
||||
.retrieveLen = dataStrLen,
|
||||
};
|
||||
req->streamId = pTask->id.streamId;
|
||||
req->srcNodeId = pTask->info.nodeId;
|
||||
req->srcTaskId = pTask->id.taskId;
|
||||
req->pRetrieve = pRetrieve;
|
||||
req->retrieveLen = dataStrLen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t sz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||
ASSERT(sz > 0);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
req.reqId = tGenIdPI64();
|
||||
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||
req.dstNodeId = pEpInfo->nodeId;
|
||||
req.dstTaskId = pEpInfo->taskId;
|
||||
int32_t len;
|
||||
tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code);
|
||||
if (code < 0) {
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
goto CLEAR;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pEpInfo->nodeId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, len);
|
||||
tEncodeStreamRetrieveReq(&encoder, &req);
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE, buf, len + sizeof(SMsgHead));
|
||||
|
||||
if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) {
|
||||
ASSERT(0);
|
||||
goto CLEAR;
|
||||
}
|
||||
|
||||
buf = NULL;
|
||||
stDebug("s-task:%s (child %d) send retrieve req to task:0x%x (vgId:%d), reqId:0x%" PRIx64, pTask->id.idStr,
|
||||
pTask->info.selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId);
|
||||
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock) {
|
||||
SStreamRetrieveReq req;
|
||||
int32_t code = buildStreamRetrieveReq(pTask, pBlock, &req);
|
||||
if(code != 0){
|
||||
return code;
|
||||
}
|
||||
code = 0;
|
||||
|
||||
CLEAR:
|
||||
taosMemoryFree(pRetrieve);
|
||||
rpcFreeCont(buf);
|
||||
code = broadcastRetrieveMsg(pTask, &req);
|
||||
taosMemoryFree(req.pRetrieve);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -144,7 +144,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
|
|||
}
|
||||
|
||||
if (output->info.type == STREAM_RETRIEVE) {
|
||||
if (streamBroadcastToChildren(pTask, output) < 0) {
|
||||
if (streamBroadcastToUpTasks(pTask, output) < 0) {
|
||||
// TODO
|
||||
}
|
||||
continue;
|
||||
|
|
Loading…
Reference in New Issue