fix(stream): remove stream in buf.

This commit is contained in:
Haojun Liao 2023-09-18 20:25:24 +08:00
parent 08b37dfc13
commit 38bf2d24e7
1 changed files with 25 additions and 5 deletions

View File

@ -83,6 +83,9 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
.sdbType = SDB_STREAM,
@ -1280,7 +1283,6 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mndTransDrop(pTrans);
return -1;
}
// mndTransSetSerial(pTrans);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
@ -1304,13 +1306,13 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
removeStreamTasksInBuf(pStream, &execNodeList);
char detail[100] = {0};
sprintf(detail, "igNotExists:%d", dropReq.igNotExists);
SName name = {0};
tNameFromString(&name, dropReq.name, T_NAME_ACCT | T_NAME_DB);
//reuse this function for stream
auditRecord(pReq, pMnode->clusterId, "dropStream", name.dbname, "", detail);
sdbRelease(pMnode->pSdb, pStream);
@ -2238,7 +2240,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
return 0;
}
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
@ -2261,6 +2263,25 @@ static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *p
}
}
void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) {
int32_t level = taosArrayGetSize(pStream->tasks);
for (int32_t i = 0; i < level; i++) {
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
int32_t numOfTasks = taosArrayGetSize(pLevel);
for (int32_t j = 0; j < numOfTasks; j++) {
SStreamTask *pTask = taosArrayGetP(pLevel, j);
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id));
if (p != NULL) {
taosArrayRemove(pExecNode->pTaskList, *(int32_t*)p);
taosHashRemove(pExecNode->pTaskMap, &id, sizeof(id));
}
}
}
}
// todo: this process should be executed by the write queue worker of the mnode
int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
@ -2277,7 +2298,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
}
tDecoderClear(&decoder);
// int64_t now = taosGetTimestampSec();
mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks);
taosThreadMutexLock(&execNodeList.lock);