refactor: do some internal refactor.
This commit is contained in:
parent
75e6fb0f16
commit
09da6c6840
|
@ -122,6 +122,7 @@ typedef struct {
|
|||
int8_t type;
|
||||
|
||||
int32_t srcVgId;
|
||||
int32_t srcTaskId;
|
||||
int32_t childId;
|
||||
int64_t sourceVer;
|
||||
int64_t reqId;
|
||||
|
|
|
@ -142,40 +142,6 @@ int32_t streamSchedExec(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||
int8_t status = 0;
|
||||
|
||||
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->srcVgId);
|
||||
if (pBlock == NULL) {
|
||||
streamTaskInputFail(pTask);
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
pTask->id.idStr);
|
||||
} else {
|
||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
// input queue is full, upstream is blocked now
|
||||
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
|
||||
}
|
||||
|
||||
// rsp by input status
|
||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp));
|
||||
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
||||
SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
pDispatchRsp->inputStatus = status;
|
||||
pDispatchRsp->streamId = htobe64(pReq->streamId);
|
||||
pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId);
|
||||
pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId);
|
||||
pDispatchRsp->downstreamNodeId = htonl(pTask->info.nodeId);
|
||||
pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId);
|
||||
|
||||
pRsp->pCont = buf;
|
||||
pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp);
|
||||
tmsgSendRsp(pRsp);
|
||||
|
||||
return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1;
|
||||
}
|
||||
|
||||
int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0);
|
||||
int8_t status = TASK_INPUT_STATUS__NORMAL;
|
||||
|
@ -240,7 +206,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq) {
|
||||
int8_t status = 0;
|
||||
|
||||
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
||||
SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
|
||||
if (pBlock == NULL) {
|
||||
streamTaskInputFail(pTask);
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
|
|
|
@ -15,6 +15,45 @@
|
|||
|
||||
#include "streamInt.h"
|
||||
|
||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
|
||||
if (pData == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pData->type = blockType;
|
||||
pData->srcVgId = srcVg;
|
||||
pData->srcTaskId = pReq->upstreamTaskId;
|
||||
|
||||
int32_t blockNum = pReq->blockNum;
|
||||
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
|
||||
if (pArray == NULL) {
|
||||
taosFreeQitem(pData);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
||||
|
||||
for (int32_t i = 0; i < blockNum; i++) {
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||
blockDecode(pDataBlock, pRetrieve->data);
|
||||
|
||||
// TODO: refactor
|
||||
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||
pDataBlock->info.version = be64toh(pRetrieve->version);
|
||||
pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
|
||||
memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||
pDataBlock->info.childId = pReq->upstreamChildId;
|
||||
}
|
||||
|
||||
pData->blocks = pArray;
|
||||
return pData;
|
||||
}
|
||||
|
||||
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
|
||||
if (pData == NULL) {
|
||||
|
|
|
@ -89,7 +89,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
|
||||
|
@ -115,7 +115,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
||||
int64_t dstTaskId) {
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->dataSrcVgId = vgId;
|
||||
pReq->srcVgId = vgId;
|
||||
pReq->upstreamTaskId = pTask->id.taskId;
|
||||
pReq->upstreamChildId = pTask->info.selfChildId;
|
||||
pReq->upstreamNodeId = pTask->info.nodeId;
|
||||
|
|
Loading…
Reference in New Issue