fix(stream): remove invalid code.
This commit is contained in:
parent
f24b22000f
commit
1f792f09c4
|
@ -33,7 +33,6 @@ SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
|
|||
|
||||
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
|
||||
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
|
||||
int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans);
|
||||
|
||||
// for sma
|
||||
// TODO refactor
|
||||
|
|
|
@ -2090,76 +2090,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t mndBuildUpdateTaskStatusTrans(SStreamObj* pStream, STrans* pTrans) {
|
||||
pStream->status = STREAM_STATUS__STOP;
|
||||
|
||||
int32_t size = taosArrayGetSize(pStream->tasks);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel);
|
||||
for (int32_t j = 0; j < numOfTasks; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
|
||||
SVPauseStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVPauseStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
mError("failed to malloc in pause stream, size:%" PRIzu ", code:%s", sizeof(SVPauseStreamTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pReq->head.vgId = htonl(pTask->info.nodeId);
|
||||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
STransAction action = {0};
|
||||
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_STOP, &pTask->info.epSet);
|
||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (atomic_load_8(&pTask->status.taskStatus) != TASK_STATUS__STOP) {
|
||||
atomic_store_8(&pTask->status.keepTaskStatus, pTask->status.taskStatus);
|
||||
atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__STOP);
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndStopInvolvedStreamTasks(SMnode *pMnode, int32_t vgId, STrans *pTrans) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, vgId);
|
||||
|
||||
const char *p = strdup(pVgroup->dbName);
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
|
||||
SStreamObj *pStream = NULL;
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (strcmp(pStream->targetDb, p) == 0 || strcmp(pStream->sourceDb, p) == 0) {
|
||||
int32_t code = mndBuildUpdateTaskStatusTrans(pStream, pTrans);
|
||||
// mDebug("stream:0x%"PRIx64" involved node changed, create update trans", pStream->uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// todo
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseStream(pMnode, pStream);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// todo: this process should be executed by the write queue worker of the mnode
|
||||
//int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||
// SMnode *pMnode = pReq->info.node;
|
||||
|
|
|
@ -1680,8 +1680,6 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb,
|
|||
pNew1->memUsed += vgMem;
|
||||
}
|
||||
|
||||
// mndStopInvolvedStreamTasks(pMnode, pVgroup->vgId, pTrans);
|
||||
|
||||
if (mndAddIncVgroupReplicaToTrans(pMnode, pTrans, pDb, &newVg, pNew1->id) != 0) goto _OVER;
|
||||
if (mndAddDecVgroupReplicaFromTrans(pMnode, pTrans, pDb, &newVg, pOld1->id) != 0) goto _OVER;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue