refactor: add logs for drop streams.
This commit is contained in:
parent
62a1277801
commit
9ba8e6bd34
|
@ -1213,27 +1213,31 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SMDropStreamReq dropReq = {0};
|
SMDropStreamReq dropReq = {0};
|
||||||
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
if (tDeserializeSMDropStreamReq(pReq->pCont, pReq->contLen, &dropReq) < 0) {
|
||||||
|
mError("invalid drop stream msg recv, discarded");
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStream = mndAcquireStream(pMnode, dropReq.name);
|
mDebug("recv drop stream:%s msg", dropReq.name);
|
||||||
|
|
||||||
|
pStream = mndAcquireStream(pMnode, dropReq.name);
|
||||||
if (pStream == NULL) {
|
if (pStream == NULL) {
|
||||||
if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
mInfo("stream:%s not exist, ignore not exist is set, drop stream exec done with success", dropReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
mError("stream:%s not exist failed to drop it", dropReq.name);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->smaId != 0) {
|
if (pStream->smaId != 0) {
|
||||||
|
mDebug("stream:%s try to drop sma related stream", dropReq.name);
|
||||||
|
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SSmaObj *pSma = NULL;
|
SSmaObj *pSma = NULL;
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
||||||
|
@ -1241,13 +1245,21 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (pSma && pSma->uid == pStream->smaId) {
|
if (pSma && pSma->uid == pStream->smaId) {
|
||||||
sdbRelease(pMnode->pSdb, pSma);
|
sdbRelease(pMnode->pSdb, pSma);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
tFreeMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
terrno = TSDB_CODE_TSMA_MUST_BE_DROPPED;
|
||||||
|
|
||||||
|
mError("try to drop sma-related stream:%s, code:%s only allowed to be dropped along with sma", dropReq.name,
|
||||||
|
tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pSma) sdbRelease(pMnode->pSdb, pSma);
|
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void**)&pSma);
|
if (pSma) {
|
||||||
|
sdbRelease(pMnode->pSdb, pSma);
|
||||||
|
}
|
||||||
|
|
||||||
|
pIter = sdbFetch(pMnode->pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1307,6 +1319,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
mndKillTransImpl(pMnode, transId, pStream->sourceDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mDebug("stream:%s transId:%d start to drop related task when dropping stream", dropReq.name, transId);
|
||||||
removeStreamTasksInBuf(pStream, &execInfo);
|
removeStreamTasksInBuf(pStream, &execInfo);
|
||||||
|
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
|
|
Loading…
Reference in New Issue