fix(stream): use the refId in stream meta list, in order to avoid access already freed stream tasks.
This commit is contained in:
parent
cbb3572fdd
commit
380f433499
|
@ -70,7 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
|
|||
#define SSTREAM_TASK_NEED_CONVERT_VER 2
|
||||
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
|
||||
|
||||
extern int32_t streamMetaId;
|
||||
extern int32_t streamMetaRefPool;
|
||||
extern int32_t streamTaskRefPool;
|
||||
|
||||
enum {
|
||||
STREAM_STATUS__NORMAL = 0,
|
||||
|
@ -258,6 +259,7 @@ typedef struct STaskId {
|
|||
typedef struct SStreamTaskId {
|
||||
int64_t streamId;
|
||||
int32_t taskId;
|
||||
int64_t refId;
|
||||
const char* idStr;
|
||||
} SStreamTaskId;
|
||||
|
||||
|
@ -291,7 +293,6 @@ typedef struct SStreamStatus {
|
|||
int8_t schedStatus;
|
||||
int8_t statusBackup;
|
||||
int32_t schedIdleTime; // idle time before invoke again
|
||||
int32_t timerActive; // timer is active
|
||||
int64_t lastExecTs; // last exec time stamp
|
||||
int32_t inScanHistorySentinel;
|
||||
bool appendTranstateBlock; // has append the transfer state data block already
|
||||
|
@ -546,7 +547,7 @@ typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
|
|||
|
||||
int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
|
||||
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask);
|
||||
void tFreeStreamTask(SStreamTask* pTask);
|
||||
void tFreeStreamTask(void* pTask);
|
||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
|
||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
|
||||
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver);
|
||||
|
@ -664,6 +665,8 @@ void streamTaskResetStatus(SStreamTask* pTask);
|
|||
void streamTaskSetStatusReady(SStreamTask* pTask);
|
||||
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
|
||||
const char* streamTaskGetExecType(int32_t type);
|
||||
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId);
|
||||
void streamTaskFreeRefId(int64_t* pRefId);
|
||||
|
||||
bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
|
||||
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask);
|
||||
|
@ -752,16 +755,15 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||
int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
||||
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
|
||||
void streamMetaClear(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta);
|
||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||
int64_t endTs, bool ready);
|
||||
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo);
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "streamBackendRocksdb.h"
|
||||
#include "trpc.h"
|
||||
#include "tstream.h"
|
||||
#include "tref.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -70,7 +71,7 @@ struct SActiveCheckpointInfo {
|
|||
SStreamTmrInfo chkptReadyMsgTmr;
|
||||
};
|
||||
|
||||
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask);
|
||||
void streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, void* param);
|
||||
|
||||
typedef struct {
|
||||
int8_t type;
|
||||
|
@ -225,6 +226,8 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo);
|
|||
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
|
||||
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
|
||||
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
|
||||
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
|
||||
void metaRefMgtRemove(int64_t* pRefId);
|
||||
|
||||
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
|
||||
|
||||
|
|
|
@ -211,18 +211,19 @@ int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void tFreeStreamTask(SStreamTask* pTask) {
|
||||
char* p = NULL;
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
void tFreeStreamTask(void* pParam) {
|
||||
char* p = NULL;
|
||||
SStreamTask* pTask = pParam;
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
|
||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||
|
||||
ETaskStatus status1 = TASK_STATUS__UNINIT;
|
||||
streamMutexLock(&pTask->lock);
|
||||
if (pTask->status.pSM != NULL) {
|
||||
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
|
||||
p = pStatus.name;
|
||||
status1 = pStatus.state;
|
||||
SStreamTaskState status = streamTaskGetStatus(pTask);
|
||||
p = status.name;
|
||||
status1 = status.state;
|
||||
}
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
||||
|
@ -235,12 +236,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount, pStatis->latestUpdateTs,
|
||||
pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
|
||||
|
||||
// remove the ref by timer
|
||||
while (pTask->status.timerActive > 0) {
|
||||
stDebug("s-task:%s wait for task stop timer activities, ref:%d", pTask->id.idStr, pTask->status.timerActive);
|
||||
taosMsleep(100);
|
||||
}
|
||||
|
||||
if (pTask->schedInfo.pDelayTimer != NULL) {
|
||||
streamTmrStop(pTask->schedInfo.pDelayTimer);
|
||||
pTask->schedInfo.pDelayTimer = NULL;
|
||||
|
@ -429,6 +424,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
}
|
||||
|
||||
pTask->refCnt = 1;
|
||||
pTask->id.refId = 0;
|
||||
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||
|
@ -441,7 +437,6 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
|
|||
}
|
||||
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
pTask->status.timerActive = 0;
|
||||
|
||||
code = streamCreateStateMachine(pTask);
|
||||
if (pTask->status.pSM == NULL || code != TSDB_CODE_SUCCESS) {
|
||||
|
@ -837,28 +832,31 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) {
|
|||
int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) {
|
||||
int32_t code = 0;
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId};
|
||||
SStreamTask* pStreamTask = NULL;
|
||||
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId));
|
||||
if (ppStreamTask != NULL) {
|
||||
code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->streamTaskId, &pStreamTask);
|
||||
if (code == 0) {
|
||||
stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr,
|
||||
(int32_t)sTaskId.taskId);
|
||||
(int32_t)pTask->streamTaskId.taskId);
|
||||
|
||||
streamMutexLock(&(*ppStreamTask)->lock);
|
||||
CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask));
|
||||
streamMutexLock(&(pStreamTask->lock));
|
||||
CLEAR_RELATED_FILLHISTORY_TASK(pStreamTask);
|
||||
|
||||
if (resetRelHalt) {
|
||||
stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s",
|
||||
sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus),
|
||||
streamTaskGetStatus(*ppStreamTask).name);
|
||||
(*ppStreamTask)->status.taskStatus = TASK_STATUS__READY;
|
||||
pTask->streamTaskId.taskId, streamTaskGetStatusStr(pStreamTask->status.taskStatus),
|
||||
streamTaskGetStatus(pStreamTask).name);
|
||||
pStreamTask->status.taskStatus = TASK_STATUS__READY;
|
||||
}
|
||||
|
||||
code = streamMetaSaveTask(pMeta, *ppStreamTask);
|
||||
streamMutexUnlock(&(*ppStreamTask)->lock);
|
||||
code = streamMetaSaveTask(pMeta, pStreamTask);
|
||||
streamMutexUnlock(&(pStreamTask->lock));
|
||||
|
||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1282,3 +1280,23 @@ const char* streamTaskGetExecType(int32_t type) {
|
|||
return "invalid-exec-type";
|
||||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
|
||||
*pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
if (*pRefId != NULL) {
|
||||
**pRefId = pTask->id.refId;
|
||||
metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
|
||||
return 0;
|
||||
} else {
|
||||
stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
void streamTaskFreeRefId(int64_t* pRefId) {
|
||||
if (pRefId == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
metaRefMgtRemove(pRefId);
|
||||
}
|
Loading…
Reference in New Issue