fix(stream):fix unrelease stream obj.
This commit is contained in:
parent
f3c306d582
commit
e776cde461
|
@ -2188,10 +2188,8 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||||
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId);
|
||||||
|
|
||||||
if (pStream != NULL) {
|
if (pStream != NULL) { // TODO:handle error
|
||||||
// TODO:handle error
|
|
||||||
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||||
mndReleaseStream(pMnode, pStream);
|
|
||||||
} else {
|
} else {
|
||||||
// todo: wait for the create stream trans completed, and launch the checkpoint trans
|
// todo: wait for the create stream trans completed, and launch the checkpoint trans
|
||||||
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
// SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
|
@ -2205,6 +2203,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
|
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", req.streamId, numOfStreams);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pStream != NULL) {
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue