refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2025-02-23 01:38:05 +08:00
parent a8d50f28a6
commit 034c907ee1
5 changed files with 68 additions and 35 deletions

View File

@ -483,8 +483,10 @@ typedef struct STaskUpdateInfo {
} STaskUpdateInfo;
typedef struct SScanWalInfo {
int32_t scanCounter;
int32_t scanSentinel;
tmr_h scanTimer;
int64_t lastScanTs;
int32_t tickCounter;
} SScanWalInfo;
typedef struct SFatalErrInfo {

View File

@ -1113,23 +1113,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// extracted submit data from wal files for all tasks
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
return tqScanWal(pTq);
}
} else {
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
}
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
// let's continue scan data in the wal files
// if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
// code = tqScanWalAsync(pTq, false); // it's ok to failed
// if (code) {
// tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
// }
// }
return code;
}
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -16,12 +16,11 @@
#include "tq.h"
#include "vnd.h"
#define MAX_REPEAT_SCAN_THRESHOLD 3
#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
#define SCAN_WAL_IDLE_DURATION 250 // idle for 500ms to do next wal scan
#define SCAN_WAL_WAIT_COUNT 2
typedef struct SBuildScanWalMsgParam {
int64_t metaId;
// int8_t restored;
SMsgCb msgCb;
} SBuildScanWalMsgParam;
@ -30,7 +29,6 @@ static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
static bool taskReadyForDataFromWal(SStreamTask* pTask);
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
static int32_t tqScanWalInFuture(STQ* pTq);
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
int32_t tqScanWal(STQ* pTq) {
@ -41,7 +39,7 @@ int32_t tqScanWal(STQ* pTq) {
int64_t el = 0;
int32_t code = 0;
int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanCounter, 0, 1);
int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
if (old == 0) {
tqDebug("vgId:%d try to scan wal to extract data", vgId);
} else {
@ -49,27 +47,44 @@ int32_t tqScanWal(STQ* pTq) {
return code;
}
// the scan wal interval less than 200, not scan, actually.
if ((pMeta->scanInfo.lastScanTs > st) && (pMeta->scanInfo.lastScanTs - st < 200)) {
tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
return code;
}
// check all tasks
code = doScanWalForAllTasks(pMeta, &numOfTasks);
el = (taosGetTimestampMs() - st);
pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
el = (pMeta->scanInfo.lastScanTs - st);
if (code) {
tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
tstrerror(code));
} else {
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms, next scan start in %dms", vgId,
el, SCAN_WAL_IDLE_DURATION);
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
}
atomic_store_32(&pMeta->scanInfo.scanCounter, 0);
atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
return code;
}
static bool waitEnoughDuration(SStreamMeta* pMeta) {
if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
pMeta->scanInfo.tickCounter = 0;
return true;
}
return false;
}
static void doStartScanWal(void* param, void* tmrId) {
int32_t vgId = 0;
int32_t code = 0;
SVnode* pVnode = NULL;
int32_t numOfTasks = 0;
tmr_h pTimer = NULL;
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
@ -79,10 +94,12 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
vgId = pMeta->vgId;
if (pMeta->closeFlag) {
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
} else {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
@ -92,9 +109,7 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
vgId = pMeta->vgId;
if (pMeta->role == NODE_ROLE_FOLLOWER) {
if (pMeta->role != NODE_ROLE_LEADER) {
tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
@ -114,6 +129,24 @@ static void doStartScanWal(void* param, void* tmrId) {
goto _end;
}
code = streamTimerGetInstance(&pTimer);
if (code) {
tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
taosMemoryFree(pParam);
return;
}
if (!waitEnoughDuration(pMeta)) {
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
"scan-wal");
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
return;
}
streamMetaRLock(pMeta);
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
streamMetaRUnLock(pMeta);
@ -135,8 +168,9 @@ static void doStartScanWal(void* param, void* tmrId) {
}
_end:
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, &pMeta->scanInfo.scanTimer, &pMeta->scanInfo.scanTimer,
vgId, "scan-wal");
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {

View File

@ -327,7 +327,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaRefPool, rid);
if (code == TSDB_CODE_SUCCESS) {
stDebug("vgId:%d jump out of meta timer", vgId);
stInfo("vgId:%d jump out of meta timer since closed", vgId);
} else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
@ -413,7 +413,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
taosMsleep(2 * META_HB_CHECK_INTERVAL);
taosMsleep(3 * META_HB_CHECK_INTERVAL);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}

View File

@ -427,7 +427,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
pMeta->scanInfo.scanCounter = 0;
pMeta->scanInfo.scanSentinel = 0;
pMeta->vgId = vgId;
pMeta->ahandle = ahandle;
pMeta->buildTaskFn = buildTaskFn;
@ -1213,8 +1213,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
streamMetaWaitForHbTmrQuit(pMeta);
pMeta->closeFlag = true;
streamMetaWaitForHbTmrQuit(pMeta);
stDebug("vgId:%d start to check all tasks for closing", vgId);
int64_t st = taosGetTimestampMs();
@ -1253,6 +1253,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
if (pMeta->scanInfo.scanTimer != NULL) {
streamTmrStop(pMeta->scanInfo.scanTimer);
pMeta->scanInfo.scanTimer = NULL;
}
streamMetaRUnLock(pMeta);
}