fix(tsdb): check the
This commit is contained in:
parent
09da6c6840
commit
afe9b848a5
|
@ -574,8 +574,6 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
|
||||||
|
|
||||||
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
|
||||||
int64_t dstTaskId);
|
|
||||||
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
|
||||||
|
|
||||||
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
|
||||||
|
|
|
@ -25,6 +25,9 @@ typedef struct SBlockName {
|
||||||
char parTbName[TSDB_TABLE_NAME_LEN];
|
char parTbName[TSDB_TABLE_NAME_LEN];
|
||||||
} SBlockName;
|
} SBlockName;
|
||||||
|
|
||||||
|
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
||||||
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type);
|
||||||
|
|
||||||
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
static void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
|
||||||
pMsg->msgType = msgType;
|
pMsg->msgType = msgType;
|
||||||
pMsg->pCont = pCont;
|
pMsg->pCont = pCont;
|
||||||
|
@ -112,8 +115,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
||||||
int64_t dstTaskId) {
|
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
pReq->srcVgId = vgId;
|
pReq->srcVgId = vgId;
|
||||||
pReq->upstreamTaskId = pTask->id.taskId;
|
pReq->upstreamTaskId = pTask->id.taskId;
|
||||||
|
@ -121,6 +124,7 @@ int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTas
|
||||||
pReq->upstreamNodeId = pTask->info.nodeId;
|
pReq->upstreamNodeId = pTask->info.nodeId;
|
||||||
pReq->blockNum = numOfBlocks;
|
pReq->blockNum = numOfBlocks;
|
||||||
pReq->taskId = dstTaskId;
|
pReq->taskId = dstTaskId;
|
||||||
|
pReq->type = type;
|
||||||
|
|
||||||
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
|
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
|
||||||
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
|
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
|
||||||
|
@ -446,7 +450,7 @@ int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
|
||||||
SStreamDispatchReq req = {0};
|
SStreamDispatchReq req = {0};
|
||||||
|
|
||||||
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||||
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId);
|
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, );
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue