fix(stream): use meta id instead of ptr.

This commit is contained in:
Haojun Liao 2024-09-29 16:12:46 +08:00
parent 09600132ec
commit 4c98786352
3 changed files with 35 additions and 39 deletions

View File

@ -70,6 +70,8 @@ typedef struct SActiveCheckpointInfo SActiveCheckpointInfo;
#define SSTREAM_TASK_NEED_CONVERT_VER 2
#define SSTREAM_TASK_SUBTABLE_CHANGED_VER 3
extern int32_t streamMetaId;
enum {
STREAM_STATUS__NORMAL = 0,
STREAM_STATUS__STOP,
@ -135,11 +137,6 @@ enum {
STREAM_QUEUE__PROCESSING,
};
enum {
STREAM_META_WILL_STOP = 1,
STREAM_META_OK_TO_STOP = 2,
};
typedef enum EStreamTaskEvent {
TASK_EVENT_INIT = 0x1,
TASK_EVENT_INIT_SCANHIST = 0x2,
@ -282,7 +279,6 @@ typedef enum {
} EConsenChkptStatus;
typedef struct SConsenChkptInfo {
// bool alreadySendChkptId;
EConsenChkptStatus status;
int64_t statusTs;
int32_t consenChkptTransId;

View File

@ -16,8 +16,13 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 100
typedef struct SBuildScanWalMsgParam {
int64_t metaId;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
@ -31,13 +36,12 @@ int32_t tqScanWal(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
int64_t st = taosGetTimestampMs();
int32_t numOfTasks = 0;
bool shouldIdle = true;
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
// check all tasks
int32_t numOfTasks = 0;
bool shouldIdle = true;
int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle);
if (code) {
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
@ -68,16 +72,19 @@ int32_t tqScanWal(STQ* pTq) {
return code;
}
typedef struct SBuildScanWalMsgParam {
STQ* pTq;
int32_t numOfTasks;
} SBuildScanWalMsgParam;
static void doStartScanWal(void* param, void* tmrId) {
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
STQ* pTq = pParam->pTq;
int32_t vgId = pTq->pStreamMeta->vgId;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
if (pMeta == NULL) {
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
taosMemoryFree(pParam);
return;
}
int32_t vgId = pMeta->vgId;
STQ* pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
@ -90,42 +97,36 @@ static void doStartScanWal(void* param, void* tmrId) {
}
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = NULL;
SBuildScanWalMsgParam* pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
if (pParam == NULL) {
return terrno;
}
pParam->pTq = pTq;
pParam->metaId = pMeta->rid;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = NULL;
code = streamTimerGetInstance(&pTimer);
if (code) {
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
taosMemoryFree(pParam);
return code;
}
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
} else {
bool ret = taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
if (!ret) {
// tqError("vgId:%d failed to start scan wal in:%dms", vgId, idleDuration);
}
}
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
return code;
}
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
bool alreadyRestored = pTq->pVnode->restored;
bool alreadyRestored = pTq->pVnode->restored;
int32_t numOfTasks = 0;
// do not launch the stream tasks, if it is a follower or not restored vnode.
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
@ -134,7 +135,7 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
streamMetaWLock(pMeta);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
streamMetaWUnLock(pMeta);
@ -378,13 +379,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
STaskId* pTaskId = taosArrayGet(pTaskList, i);
STaskId* pTaskId = taosArrayGet(pTaskList, i);
if (pTaskId == NULL) {
continue;
}
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
int32_t code = streamMetaAcquireTask(pStreamMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (pTask == NULL || code != 0) {
continue;
}

View File

@ -164,7 +164,6 @@ extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();