fix(stream): memory error
This commit is contained in:
parent
ad032a0a90
commit
d9a4ea305d
|
@ -42,8 +42,8 @@ enum {
|
||||||
TASK_STATUS__DROPPING,
|
TASK_STATUS__DROPPING,
|
||||||
TASK_STATUS__FAIL,
|
TASK_STATUS__FAIL,
|
||||||
TASK_STATUS__STOP,
|
TASK_STATUS__STOP,
|
||||||
TASK_STATUS__PREPARE_RECOVER,
|
TASK_STATUS__RECOVER_DOWNSTREAM,
|
||||||
TASK_STATUS__RECOVERING,
|
TASK_STATUS__RECOVER_SELF,
|
||||||
};
|
};
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -232,8 +232,8 @@ typedef struct {
|
||||||
} SStreamChildEpInfo;
|
} SStreamChildEpInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t nodeId;
|
int32_t srcNodeId;
|
||||||
int32_t childId;
|
int32_t srcChildId;
|
||||||
int64_t stateSaveVer;
|
int64_t stateSaveVer;
|
||||||
int64_t stateProcessedVer;
|
int64_t stateProcessedVer;
|
||||||
} SStreamCheckpointInfo;
|
} SStreamCheckpointInfo;
|
||||||
|
@ -394,9 +394,6 @@ typedef struct {
|
||||||
int32_t upstreamTaskId;
|
int32_t upstreamTaskId;
|
||||||
int32_t upstreamChildId;
|
int32_t upstreamChildId;
|
||||||
int32_t upstreamNodeId;
|
int32_t upstreamNodeId;
|
||||||
#if 0
|
|
||||||
int64_t sourceVer;
|
|
||||||
#endif
|
|
||||||
int32_t blockNum;
|
int32_t blockNum;
|
||||||
SArray* dataLen; // SArray<int32_t>
|
SArray* dataLen; // SArray<int32_t>
|
||||||
SArray* data; // SArray<SRetrieveTableRsp*>
|
SArray* data; // SArray<SRetrieveTableRsp*>
|
||||||
|
@ -426,6 +423,7 @@ typedef struct {
|
||||||
int32_t rspToTaskId;
|
int32_t rspToTaskId;
|
||||||
} SStreamRetrieveRsp;
|
} SStreamRetrieveRsp;
|
||||||
|
|
||||||
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t streamId;
|
int64_t streamId;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
|
@ -462,13 +460,25 @@ int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq
|
||||||
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
|
int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp);
|
||||||
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
|
int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
||||||
int64_t streamId;
|
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
||||||
} SPStreamTaskRecoverReq;
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t reserved;
|
int64_t streamId;
|
||||||
} SPStreamTaskRecoverRsp;
|
int32_t downstreamTaskId;
|
||||||
|
int32_t taskId;
|
||||||
|
} SStreamRecoverDownstreamReq;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t streamId;
|
||||||
|
int32_t downstreamTaskId;
|
||||||
|
int32_t taskId;
|
||||||
|
SArray* checkpointVer; // SArray<SStreamCheckpointInfo>
|
||||||
|
} SStreamRecoverDownstreamRsp;
|
||||||
|
|
||||||
|
int32_t tEncodeSStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamRecoverDownstreamReq* pReq);
|
||||||
|
int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, const SStreamRecoverDownstreamRsp* pRsp);
|
||||||
|
|
||||||
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
|
||||||
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
|
||||||
|
@ -479,8 +489,6 @@ int32_t streamSetupTrigger(SStreamTask* pTask);
|
||||||
int32_t streamProcessRunReq(SStreamTask* pTask);
|
int32_t streamProcessRunReq(SStreamTask* pTask);
|
||||||
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
||||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp);
|
||||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pMsg);
|
|
||||||
int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp);
|
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||||
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
|
||||||
|
|
|
@ -504,6 +504,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
|
static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
|
SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq));
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
|
@ -540,7 +541,6 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
if (pStream->isDistributed) {
|
if (pStream->isDistributed) {
|
||||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||||
|
|
|
@ -777,6 +777,7 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
SStreamTaskRecoverReq* pReq = pMsg->pCont;
|
||||||
int32_t taskId = pReq->taskId;
|
int32_t taskId = pReq->taskId;
|
||||||
|
@ -789,18 +790,6 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|
||||||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
|
||||||
int32_t taskId = pRsp->taskId;
|
|
||||||
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
|
||||||
if (pTask) {
|
|
||||||
streamProcessDispatchRsp(pTask, pRsp);
|
|
||||||
return 0;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
|
||||||
int32_t taskId = pRsp->rspTaskId;
|
int32_t taskId = pRsp->rspTaskId;
|
||||||
|
@ -813,6 +802,19 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
|
int32_t taskId = pRsp->taskId;
|
||||||
|
SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
|
||||||
|
if (pTask) {
|
||||||
|
streamProcessDispatchRsp(pTask, pRsp);
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
|
||||||
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
|
||||||
|
@ -873,6 +875,8 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
streamProcessDispatchReq(pTask, &req, &rsp, false);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -883,4 +887,6 @@ FAIL:
|
||||||
.info = pMsg->info,
|
.info = pMsg->info,
|
||||||
};
|
};
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -332,14 +332,14 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
case TDMT_STREAM_TASK_DISPATCH:
|
case TDMT_STREAM_TASK_DISPATCH:
|
||||||
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
|
// return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, pInfo->workerId != 0);
|
||||||
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
return tqProcessTaskDispatchReq(pVnode->pTq, pMsg, true);
|
||||||
case TDMT_STREAM_TASK_RECOVER:
|
/*case TDMT_STREAM_TASK_RECOVER:*/
|
||||||
return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);
|
/*return tqProcessTaskRecoverReq(pVnode->pTq, pMsg);*/
|
||||||
case TDMT_STREAM_RETRIEVE:
|
case TDMT_STREAM_RETRIEVE:
|
||||||
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveReq(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
case TDMT_STREAM_TASK_DISPATCH_RSP:
|
||||||
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskDispatchRsp(pVnode->pTq, pMsg);
|
||||||
case TDMT_STREAM_TASK_RECOVER_RSP:
|
/*case TDMT_STREAM_TASK_RECOVER_RSP:*/
|
||||||
return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);
|
/*return tqProcessTaskRecoverRsp(pVnode->pTq, pMsg);*/
|
||||||
case TDMT_STREAM_RETRIEVE_RSP:
|
case TDMT_STREAM_RETRIEVE_RSP:
|
||||||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -229,6 +229,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessRecoverReq(SStreamTask* pTask, SStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) {
|
||||||
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
|
void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamTaskRecoverRsp));
|
||||||
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId);
|
||||||
|
@ -267,6 +268,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp)
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) {
|
||||||
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
qDebug("task %d receive retrieve req from node %d task %d", pTask->taskId, pReq->srcNodeId, pReq->srcTaskId);
|
||||||
|
|
|
@ -200,14 +200,13 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
streamTaskExecImpl(pTask, data, pRes);
|
streamTaskExecImpl(pTask, data, pRes);
|
||||||
qDebug("stream task %d exec end", pTask->taskId);
|
qDebug("stream task %d exec end", pTask->taskId);
|
||||||
|
|
||||||
streamFreeQitem(data);
|
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) != 0) {
|
if (taosArrayGetSize(pRes) != 0) {
|
||||||
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM);
|
||||||
if (qRes == NULL) {
|
if (qRes == NULL) {
|
||||||
// TODO log failed ver
|
// TODO log failed ver
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
taosArrayDestroy(pRes);
|
taosArrayDestroy(pRes);
|
||||||
|
streamFreeQitem(data);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
qRes->type = STREAM_INPUT__DATA_BLOCK;
|
||||||
|
@ -224,10 +223,14 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
||||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||||
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
|
streamFreeQitem(data);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
/*streamQueueProcessSuccess(pTask->inputQueue);*/
|
||||||
|
} else {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
}
|
}
|
||||||
|
streamFreeQitem(data);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "streamInc.h"
|
#include "streamInc.h"
|
||||||
|
|
||||||
|
#if 0
|
||||||
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
|
int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) {
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
|
@ -86,17 +87,18 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
|
int32_t tEncodeSStreamCheckpointInfo(SEncoder* pEncoder, const SStreamCheckpointInfo* pCheckpoint) {
|
||||||
if (tEncodeI32(pEncoder, pCheckpoint->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pCheckpoint->srcNodeId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pCheckpoint->srcChildId) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
|
if (tEncodeI64(pEncoder, pCheckpoint->stateProcessedVer) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
|
int32_t tDecodeSStreamCheckpointInfo(SDecoder* pDecoder, SStreamCheckpointInfo* pCheckpoint) {
|
||||||
if (tDecodeI32(pDecoder, &pCheckpoint->nodeId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pCheckpoint->srcNodeId) < 0) return -1;
|
||||||
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pCheckpoint->srcChildId) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pCheckpoint->stateProcessedVer) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -221,11 +223,17 @@ int32_t streamSaveAggLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamFetchSinkStatus(SStreamTask* pTask) {
|
int32_t streamFetchDownstreamStatus(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->taskLevel != TASK_LEVEL__SINK);
|
|
||||||
// set self status to recover_phase1
|
// set self status to recover_phase1
|
||||||
// build fetch status msg
|
// build fetch status msg
|
||||||
// send fetch msg
|
// send fetch msg
|
||||||
|
atomic_store_8(&pTask->taskStatus, TASK_STATUS__RECOVER_DOWNSTREAM);
|
||||||
|
|
||||||
|
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue