fix(sma): drop stream when drop sma
This commit is contained in:
parent
48718a1b91
commit
996f995644
|
@ -33,6 +33,10 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
||||||
|
|
||||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
// for sma
|
||||||
|
// TODO refactor
|
||||||
|
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,11 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndSma.h"
|
#include "mndSma.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndInfoSchema.h"
|
#include "mndInfoSchema.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndScheduler.h"
|
#include "mndScheduler.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
@ -857,6 +857,23 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
||||||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||||
|
|
||||||
|
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||||
|
if (pStream == NULL || pStream->smaId != pSma->uid) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
goto _OVER;
|
||||||
|
} else {
|
||||||
|
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||||
|
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
|
||||||
|
// drop stream
|
||||||
|
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||||
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||||
|
|
|
@ -14,10 +14,10 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "mndStream.h"
|
#include "mndStream.h"
|
||||||
#include "mndPrivilege.h"
|
|
||||||
#include "mndDb.h"
|
#include "mndDb.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "mndPrivilege.h"
|
||||||
#include "mndScheduler.h"
|
#include "mndScheduler.h"
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndStb.h"
|
#include "mndStb.h"
|
||||||
|
@ -490,7 +490,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||||
for (int32_t i = 0; i < lv; i++) {
|
for (int32_t i = 0; i < lv; i++) {
|
||||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||||
|
|
|
@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
taosArrayPush(tagArray, &tagVal);
|
taosArrayPush(tagArray, &tagVal);
|
||||||
tTagNew(tagArray, 1, false, &pTag);
|
tTagNew(tagArray, 1, false, &pTag);
|
||||||
if (pTag == NULL) {
|
if (pTag == NULL) {
|
||||||
taosArrayDestroy(schemaReqs);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
taosArrayDestroy(tagArray);
|
taosArrayDestroy(tagArray);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -104,6 +104,8 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pRead->pReadLogTFile = pLogTFile;
|
||||||
|
|
||||||
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
|
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
|
||||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||||
if (pIdxTFile == NULL) {
|
if (pIdxTFile == NULL) {
|
||||||
|
@ -112,7 +114,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRead->pReadLogTFile = pLogTFile;
|
|
||||||
pRead->pReadIdxTFile = pIdxTFile;
|
pRead->pReadIdxTFile = pIdxTFile;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue