From 380f43349972ff44794b1c7e05bf5ad48ebc8b22 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Oct 2024 15:40:17 +0800 Subject: [PATCH] fix(stream): use the refId in stream meta list, in order to avoid access already freed stream tasks. --- include/libs/stream/tstream.h | 12 +++--- source/libs/stream/inc/streamInt.h | 5 ++- source/libs/stream/src/streamTask.c | 66 ++++++++++++++++++----------- 3 files changed, 53 insertions(+), 30 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index a189cee0bb..449df5207f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -70,7 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo; #define SSTREAM_TASK_NEED_CONVERT_VER 2 #define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3 -extern int32_t streamMetaId; +extern int32_t streamMetaRefPool; +extern int32_t streamTaskRefPool; enum { STREAM_STATUS__NORMAL = 0, @@ -258,6 +259,7 @@ typedef struct STaskId { typedef struct SStreamTaskId { int64_t streamId; int32_t taskId; + int64_t refId; const char* idStr; } SStreamTaskId; @@ -291,7 +293,6 @@ typedef struct SStreamStatus { int8_t schedStatus; int8_t statusBackup; int32_t schedIdleTime; // idle time before invoke again - int32_t timerActive; // timer is active int64_t lastExecTs; // last exec time stamp int32_t inScanHistorySentinel; bool appendTranstateBlock; // has append the transfer state data block already @@ -546,7 +547,7 @@ typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask); -void tFreeStreamTask(SStreamTask* pTask); +void tFreeStreamTask(void* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver); @@ -664,6 +665,8 @@ void streamTaskResetStatus(SStreamTask* pTask); void streamTaskSetStatusReady(SStreamTask* pTask); ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); const char* streamTaskGetExecType(int32_t type); +int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId); +void streamTaskFreeRefId(int64_t* pRefId); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -752,16 +755,15 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); +int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); void streamMetaStartHb(SStreamMeta* pMeta); -bool streamMetaTaskInTimer(SStreamMeta* pMeta); int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int64_t endTs, bool ready); int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 94c196d280..93c8dc3b40 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -21,6 +21,7 @@ #include "streamBackendRocksdb.h" #include "trpc.h" #include "tstream.h" +#include "tref.h" #ifdef __cplusplus extern "C" { @@ -70,7 +71,7 @@ struct SActiveCheckpointInfo { SStreamTmrInfo chkptReadyMsgTmr; }; -int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask); +void streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, void* param); typedef struct { int8_t type; @@ -225,6 +226,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo); void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta); void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount); int32_t streamMetaSendHbHelper(SStreamMeta* pMeta); +int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid); +void metaRefMgtRemove(int64_t* pRefId); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b359cdfc81..e00984ea9b 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -211,18 +211,19 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) { return 0; } -void tFreeStreamTask(SStreamTask* pTask) { - char* p = NULL; - int32_t taskId = pTask->id.taskId; +void tFreeStreamTask(void* pParam) { + char* p = NULL; + SStreamTask* pTask = pParam; + int32_t taskId = pTask->id.taskId; STaskExecStatisInfo* pStatis = &pTask->execInfo; ETaskStatus status1 = TASK_STATUS__UNINIT; streamMutexLock(&pTask->lock); if (pTask->status.pSM != NULL) { - SStreamTaskState pStatus = streamTaskGetStatus(pTask); - p = pStatus.name; - status1 = pStatus.state; + SStreamTaskState status = streamTaskGetStatus(pTask); + p = status.name; + status1 = status.state; } streamMutexUnlock(&pTask->lock); @@ -235,12 +236,6 @@ void tFreeStreamTask(SStreamTask* pTask) { taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint); - // remove the ref by timer - while (pTask->status.timerActive > 0) { - stDebug("s-task:%s wait for task stop timer activities, ref:%d", pTask->id.idStr, pTask->status.timerActive); - taosMsleep(100); - } - if (pTask->schedInfo.pDelayTimer != NULL) { streamTmrStop(pTask->schedInfo.pDelayTimer); pTask->schedInfo.pDelayTimer = NULL; @@ -429,6 +424,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->refCnt = 1; + pTask->id.refId = 0; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; @@ -441,7 +437,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; - pTask->status.timerActive = 0; code = streamCreateStateMachine(pTask); if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) { @@ -837,28 +832,31 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { int32_t code = 0; SStreamMeta* pMeta = pTask->pMeta; - STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; + SStreamTask* pStreamTask = NULL; + if (pTask->info.fillHistory == 0) { return code; } - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); - if (ppStreamTask != NULL) { + code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask); + if (code == 0) { stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, - (int32_t)sTaskId.taskId); + (int32_t)pTask->streamTaskId.taskId); - streamMutexLock(&(*ppStreamTask)->lock); - CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); + streamMutexLock(&(pStreamTask->lock)); + CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask); if (resetRelHalt) { stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s", - sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus), - streamTaskGetStatus(*ppStreamTask).name); - (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; + pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus), + streamTaskGetStatus(pStreamTask).name); + pStreamTask->status.taskStatus = TASK_STATUS__READY; } - code = streamMetaSaveTask(pMeta, *ppStreamTask); - streamMutexUnlock(&(*ppStreamTask)->lock); + code = streamMetaSaveTask(pMeta, pStreamTask); + streamMutexUnlock(&(pStreamTask->lock)); + + streamMetaReleaseTask(pMeta, pStreamTask); } return code; @@ -1282,3 +1280,23 @@ const char* streamTaskGetExecType(int32_t type) { return "invalid-exec-type"; } } + +int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) { + *pRefId = taosMemoryMalloc(sizeof(int64_t)); + if (*pRefId != NULL) { + **pRefId = pTask->id.refId; + metaRefMgtAdd(pTask->pMeta->vgId, *pRefId); + return 0; + } else { + stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno)); + return terrno; + } +} + +void streamTaskFreeRefId(int64_t* pRefId) { + if (pRefId == NULL) { + return; + } + + metaRefMgtRemove(pRefId); +} \ No newline at end of file