Merge pull request #14250 from taosdata/feature/stream
fix(sma): drop stream when drop sma
This commit is contained in:
commit
e571567ec2
|
@ -34,6 +34,11 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
|||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
// for sma
|
||||
// TODO refactor
|
||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -857,6 +857,24 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
|
|||
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
|
||||
SStreamObj *pStream = mndAcquireStream(pMnode, pSma->name);
|
||||
if (pStream == NULL || pStream->smaId != pSma->uid) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
} else {
|
||||
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
|
||||
mError("stream:%s, failed to drop task since %s", pStream->name, terrstr());
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// drop stream
|
||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
|
||||
if (mndSetDropSmaCommitLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
|
||||
|
|
|
@ -490,7 +490,7 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
|
||||
int32_t lv = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < lv; i++) {
|
||||
SArray *pTasks = taosArrayGetP(pStream->tasks, i);
|
||||
|
|
|
@ -43,7 +43,7 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
|||
taosArrayPush(tagArray, &tagVal);
|
||||
tTagNew(tagArray, 1, false, &pTag);
|
||||
if (pTag == NULL) {
|
||||
taosArrayDestroy(schemaReqs);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
taosArrayDestroy(tagArray);
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -348,7 +348,6 @@ typedef struct SStreamBlockScanInfo {
|
|||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here.
|
||||
SArray* childIds;
|
||||
SessionWindowSupporter sessionSup;
|
||||
bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA.
|
||||
int32_t scanWinIndex; // for state operator
|
||||
int32_t pullDataResIndex;
|
||||
SSDataBlock* pPullDataRes; // pull data SSDataBlock
|
||||
|
|
|
@ -19,8 +19,7 @@
|
|||
#include "tdatablock.h"
|
||||
#include "vnode.h"
|
||||
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, bool assignUid,
|
||||
char* id) {
|
||||
static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t numOfBlocks, int32_t type, char* id) {
|
||||
ASSERT(pOperator != NULL);
|
||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
|
||||
if (pOperator->numOfDownstream == 0) {
|
||||
|
@ -33,12 +32,11 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
|
|||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, assignUid, id);
|
||||
return doSetStreamBlock(pOperator->pDownstream[0], input, numOfBlocks, type, id);
|
||||
} else {
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
pInfo->assignBlockUid = assignUid;
|
||||
|
||||
// TODO: if a block was set but not consumed,
|
||||
// prevent setting a different type of block
|
||||
|
@ -76,7 +74,7 @@ int32_t qStreamScanSnapshot(qTaskInfo_t tinfo) {
|
|||
return TSDB_CODE_QRY_APP_ERROR;
|
||||
}
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, 0, NULL);
|
||||
return doSetStreamBlock(pTaskInfo->pRoot, NULL, 0, STREAM_DATA_TYPE_FROM_SNAPSHOT, NULL);
|
||||
}
|
||||
|
||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input, int32_t type, bool assignUid) {
|
||||
|
@ -94,8 +92,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO
|
|||
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
|
||||
int32_t code =
|
||||
doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, assignUid, GET_TASKID(pTaskInfo));
|
||||
int32_t code = doSetStreamBlock(pTaskInfo->pRoot, (void**)pBlocks, numOfBlocks, type, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
qError("%s failed to set the stream block data", GET_TASKID(pTaskInfo));
|
||||
} else {
|
||||
|
|
|
@ -1051,14 +1051,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
pInfo->pRes->info.type = STREAM_NORMAL;
|
||||
pInfo->pRes->info.capacity = numOfRows;
|
||||
|
||||
// for generating rollup SMA result, each time is an independent time serie.
|
||||
// TODO temporarily used, when the statement of "partition by tbname" is ready, remove this
|
||||
if (pInfo->assignBlockUid) {
|
||||
pInfo->pRes->info.groupId = uid;
|
||||
} else {
|
||||
pInfo->pRes->info.groupId = groupId;
|
||||
}
|
||||
|
||||
uint64_t* groupIdPre = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &uid, sizeof(int64_t));
|
||||
if (groupIdPre) {
|
||||
pInfo->pRes->info.groupId = *groupIdPre;
|
||||
|
|
|
@ -103,6 +103,7 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
wError("cannot open file %s, since %s", fnameStr, terrstr());
|
||||
return -1;
|
||||
}
|
||||
pRead->pReadLogTFile = pLogTFile;
|
||||
|
||||
walBuildIdxName(pRead->pWal, fileFirstVer, fnameStr);
|
||||
TdFilePtr pIdxTFile = taosOpenFile(fnameStr, TD_FILE_READ);
|
||||
|
@ -112,7 +113,6 @@ static int32_t walReadChangeFile(SWalReadHandle *pRead, int64_t fileFirstVer) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
pRead->pReadLogTFile = pLogTFile;
|
||||
pRead->pReadIdxTFile = pIdxTFile;
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue