fix(stream): clear htask info when unregistering the task.

This commit is contained in:
Haojun Liao 2024-01-24 16:23:47 +08:00
parent 1a45d40607
commit dc1ea9f9a1
4 changed files with 22 additions and 10 deletions

View File

@ -85,6 +85,7 @@ static void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbNam
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
static void freeCheckpointCandEntry(void *);
static void freeTaskList(void *param);
static SSdbRaw *mndStreamActionEncode(SStreamObj *pStream);
static SSdbRow *mndStreamActionDecode(SSdbRaw *pRaw);
@ -154,6 +155,7 @@ int32_t mndInitStream(SMnode *pMnode) {
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
taosHashSetFreeFp(execInfo.transMgmt.pWaitingList, freeCheckpointCandEntry);
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
if (sdbSetTable(pMnode->pSdb, table) != 0) {
return -1;
@ -3036,6 +3038,11 @@ void freeCheckpointCandEntry(void *param) {
taosMemoryFreeClear(pEntry->pName);
}
void freeTaskList(void* param) {
SArray** pList = (SArray **)param;
taosArrayDestroy(*pList);
}
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb;
@ -3111,7 +3118,6 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
// remove this entry
taosArrayDestroy(*pReqTaskList);
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);

View File

@ -467,7 +467,6 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
taosRemoveRef(streamBackendId, pMeta->streamBackendRid);
taosHashClear(pMeta->pTasksMap);
taosArrayClear(pMeta->pTaskList);
@ -505,7 +504,9 @@ void streamMetaCloseImpl(void* arg) {
return;
}
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
tdbAbort(pMeta->db, pMeta->txn);
tdbTbClose(pMeta->pTaskDb);
@ -519,7 +520,6 @@ void streamMetaCloseImpl(void* arg) {
taosHashCleanup(pMeta->pTasksMap);
taosHashCleanup(pMeta->pTaskDbUnique);
taosHashCleanup(pMeta->pUpdateTaskSet);
// taosHashCleanup(pMeta->pTaskBackendUnique);
taosHashCleanup(pMeta->updateInfo.pTasks);
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
@ -534,6 +534,8 @@ void streamMetaCloseImpl(void* arg) {
bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT;
taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
}
@ -731,6 +733,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
// it is an fill-history task, remove the related stream task's id that points to it
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
if (pTask->info.fillHistory == 1) {
streamTaskClearHTaskAttr(pTask, false);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
doRemoveIdFromList(pMeta, (int32_t)taosArrayGetSize(pMeta->pTaskList), &pTask->id);

View File

@ -346,9 +346,9 @@ void tFreeStreamTask(SStreamTask* pTask, bool metaLock) {
STaskExecStatisInfo* pStatis = &pTask->execInfo;
// check for mnode
if (pTask->pMeta != NULL) {
streamTaskClearHTaskAttr(pTask, metaLock);
}
// if (pTask->pMeta != NULL) {
// streamTaskClearHTaskAttr(pTask, metaLock);
// }
ETaskStatus status1 = TASK_STATUS__UNINIT;
taosThreadMutexLock(&pTask->lock);
@ -751,13 +751,13 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, bool metaLock) {
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
if (ppStreamTask != NULL) {
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId);
taosThreadMutexLock(&(*ppStreamTask)->lock);
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
streamMetaSaveTask(pMeta, *ppStreamTask);
taosThreadMutexUnlock(&(*ppStreamTask)->lock);
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
(int32_t)sTaskId.taskId);
}
if (metaLock) {

View File

@ -24,7 +24,8 @@ from util.dnodes import tdDnodes
from util.dnodes import *
class TDTestCase:
updatecfgDict = {'debugflag':0,'stdebugFlag': 143 ,"tqDebugflag":135}
def init(self, conn, logSql, replicaVar):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), logSql)