Merge pull request #29900 from taosdata/fix/checkfh
refactor(stream): scan wal is driven by time, instead of insert events.
This commit is contained in:
commit
24fae6adbb
|
@ -484,8 +484,10 @@ typedef struct STaskUpdateInfo {
|
||||||
} STaskUpdateInfo;
|
} STaskUpdateInfo;
|
||||||
|
|
||||||
typedef struct SScanWalInfo {
|
typedef struct SScanWalInfo {
|
||||||
int32_t scanCounter;
|
int32_t scanSentinel;
|
||||||
tmr_h scanTimer;
|
tmr_h scanTimer;
|
||||||
|
int64_t lastScanTs;
|
||||||
|
int32_t tickCounter;
|
||||||
} SScanWalInfo;
|
} SScanWalInfo;
|
||||||
|
|
||||||
typedef struct SFatalErrInfo {
|
typedef struct SFatalErrInfo {
|
||||||
|
|
|
@ -2663,7 +2663,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
bool allReady = true;
|
bool allReady = true;
|
||||||
SArray *pNodeSnapshot = NULL;
|
SArray *pNodeSnapshot = NULL;
|
||||||
int32_t maxAllowedTrans = 50;
|
int32_t maxAllowedTrans = 20;
|
||||||
int32_t numOfTrans = 0;
|
int32_t numOfTrans = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -2750,6 +2750,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// todo: check for redundant consensus-checkpoint trans, if this kinds of trans repeatly failed.
|
||||||
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
|
code = mndCreateSetConsensusChkptIdTrans(pMnode, pStream, pe->req.taskId, chkId, pe->req.startTs);
|
||||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
|
mError("failed to create consensus-checkpoint trans, stream:0x%" PRIx64, pStream->uid);
|
||||||
|
|
|
@ -239,7 +239,7 @@ void tqClose(STQ*);
|
||||||
int tqPushMsg(STQ*, tmsg_t msgType);
|
int tqPushMsg(STQ*, tmsg_t msgType);
|
||||||
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
|
||||||
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
void tqUnregisterPushHandle(STQ* pTq, void* pHandle);
|
||||||
int tqScanWalAsync(STQ* pTq, bool ckPause);
|
void tqScanWalAsync(STQ* pTq);
|
||||||
int32_t tqStopStreamTasksAsync(STQ* pTq);
|
int32_t tqStopStreamTasksAsync(STQ* pTq);
|
||||||
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
|
||||||
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
|
|
@ -920,12 +920,12 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
|
||||||
|
|
||||||
// now the fill-history task starts to scan data from wal files.
|
// now the fill-history task starts to scan data from wal files.
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
// if (code == TSDB_CODE_SUCCESS) {
|
||||||
code = tqScanWalAsync(pTq, false);
|
// code = tqScanWalAsync(pTq, false);
|
||||||
if (code) {
|
// if (code) {
|
||||||
tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
|
// tqError("vgId:%d failed to start scan wal file, code:%s", vgId, tstrerror(code));
|
||||||
}
|
// }
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1113,23 +1113,14 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// extracted submit data from wal files for all tasks
|
// extracted submit data from wal files for all tasks
|
||||||
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
|
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
|
||||||
return tqScanWal(pTq);
|
return tqScanWal(pTq);
|
||||||
}
|
} else {
|
||||||
|
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
||||||
|
if (code) {
|
||||||
|
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
|
|
||||||
if (code) {
|
|
||||||
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// let's continue scan data in the wal files
|
|
||||||
if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
|
|
||||||
code = tqScanWalAsync(pTq, false); // it's ok to failed
|
|
||||||
if (code) {
|
|
||||||
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
|
@ -49,20 +49,6 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaRLock(pTq->pStreamMeta);
|
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
|
|
||||||
streamMetaRUnLock(pTq->pStreamMeta);
|
|
||||||
|
|
||||||
// tqTrace("vgId:%d handle submit, restore:%d, numOfTasks:%d", TD_VID(pTq->pVnode), pTq->pVnode->restored, numOfTasks);
|
|
||||||
|
|
||||||
// push data for stream processing:
|
|
||||||
// 1. the vnode has already been restored.
|
|
||||||
// 2. the vnode should be the leader.
|
|
||||||
// 3. the stream is not suspended yet.
|
|
||||||
if ((!tsDisableStream) && (numOfTasks > 0)) {
|
|
||||||
code = tqScanWalAsync(pTq, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,23 +16,19 @@
|
||||||
#include "tq.h"
|
#include "tq.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
#define SCAN_WAL_IDLE_DURATION 250 // idle for 500ms to do next wal scan
|
||||||
#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan
|
#define SCAN_WAL_WAIT_COUNT 2
|
||||||
|
|
||||||
typedef struct SBuildScanWalMsgParam {
|
typedef struct SBuildScanWalMsgParam {
|
||||||
int64_t metaId;
|
int64_t metaId;
|
||||||
int32_t numOfTasks;
|
|
||||||
int8_t restored;
|
|
||||||
SMsgCb msgCb;
|
SMsgCb msgCb;
|
||||||
} SBuildScanWalMsgParam;
|
} SBuildScanWalMsgParam;
|
||||||
|
|
||||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks);
|
||||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||||
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
|
static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc);
|
||||||
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
|
|
||||||
static int32_t doScanWalAsync(STQ* pTq, bool ckPause);
|
|
||||||
|
|
||||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||||
int32_t tqScanWal(STQ* pTq) {
|
int32_t tqScanWal(STQ* pTq) {
|
||||||
|
@ -40,44 +36,55 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
int32_t numOfTasks = 0;
|
int32_t numOfTasks = 0;
|
||||||
|
int64_t el = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter);
|
int32_t old = atomic_val_compare_exchange_32(&pMeta->scanInfo.scanSentinel, 0, 1);
|
||||||
|
if (old == 0) {
|
||||||
// check all tasks
|
tqDebug("vgId:%d try to scan wal to extract data", vgId);
|
||||||
int32_t code = doScanWalForAllTasks(pMeta);
|
} else {
|
||||||
if (code) {
|
tqDebug("vgId:%d already in wal scan, abort", vgId);
|
||||||
tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code));
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
// the scan wal interval less than 200, not scan, actually.
|
||||||
int32_t times = (--pMeta->scanInfo.scanCounter);
|
if ((pMeta->scanInfo.lastScanTs > 0) && (st - pMeta->scanInfo.lastScanTs < 200)) {
|
||||||
if (times < 0) {
|
tqDebug("vgId:%d scan wal less than 200ms, do nothing", vgId);
|
||||||
tqError("vgId:%d invalid scan counter:%d, reset to 0", vgId, times);
|
atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
|
||||||
times = 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
// check all tasks
|
||||||
streamMetaWUnLock(pMeta);
|
code = doScanWalForAllTasks(pMeta, &numOfTasks);
|
||||||
|
|
||||||
int64_t el = (taosGetTimestampMs() - st);
|
pMeta->scanInfo.lastScanTs = taosGetTimestampMs();
|
||||||
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 " ms", vgId, el);
|
el = (pMeta->scanInfo.lastScanTs - st);
|
||||||
|
|
||||||
if (times > 0) {
|
if (code) {
|
||||||
tqDebug("vgId:%d scan wal for stream tasks for %d times in %dms", vgId, times, SCAN_WAL_IDLE_DURATION);
|
tqError("vgId:%d failed to scan wal for all tasks, try next time, elapsed time:%" PRId64 "ms code:%s", vgId, el,
|
||||||
code = tqScanWalInFuture(pTq, numOfTasks, SCAN_WAL_IDLE_DURATION);
|
tstrerror(code));
|
||||||
if (code) {
|
} else {
|
||||||
tqError("vgId:%d sched scan wal in %dms failed, ignore this failure", vgId, SCAN_WAL_IDLE_DURATION);
|
tqDebug("vgId:%d scan wal for stream tasks completed, elapsed time:%" PRId64 "ms", vgId, el);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
atomic_store_32(&pMeta->scanInfo.scanSentinel, 0);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doStartScanWal(void* param, void* tmrId) {
|
static bool waitEnoughDuration(SStreamMeta* pMeta) {
|
||||||
int32_t vgId = 0;
|
if ((++pMeta->scanInfo.tickCounter) >= SCAN_WAL_WAIT_COUNT) {
|
||||||
int32_t code = 0;
|
pMeta->scanInfo.tickCounter = 0;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doStartScanWal(void* param, void* tmrId) {
|
||||||
|
int32_t vgId = 0;
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t numOfTasks = 0;
|
||||||
|
tmr_h pTimer = NULL;
|
||||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||||
|
|
||||||
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
|
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
|
||||||
|
@ -87,10 +94,18 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vgId = pMeta->vgId;
|
||||||
|
code = streamTimerGetInstance(&pTimer);
|
||||||
|
if (code) {
|
||||||
|
tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal, not scan wal, code:%s", vgId, tstrerror(code));
|
||||||
|
taosMemoryFree(pParam);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMeta->closeFlag) {
|
if (pMeta->closeFlag) {
|
||||||
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
|
tqInfo("vgId:%d jump out of scan wal timer since closed", vgId);
|
||||||
} else {
|
} else {
|
||||||
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
|
@ -100,71 +115,100 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
vgId = pMeta->vgId;
|
if (pMeta->role != NODE_ROLE_LEADER) {
|
||||||
|
tqDebug("vgId:%d not leader, role:%d not scan wal anymore", vgId, pMeta->role);
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
pParam->restored);
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
#if 0
|
tqDebug("vgId:%d jump out of scan wal timer since not leader", vgId);
|
||||||
// wait for the vnode is freed, and invalid read may occur.
|
} else {
|
||||||
|
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemFree(pParam);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMeta->startInfo.startAllTasks) {
|
||||||
|
tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId);
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!waitEnoughDuration(pMeta)) {
|
||||||
|
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
|
||||||
|
"scan-wal");
|
||||||
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
|
if (code) {
|
||||||
|
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// wait for the vnode is freed, and invalid read may occur.
|
||||||
taosMsleep(10000);
|
taosMsleep(10000);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
_end:
|
||||||
|
|
||||||
|
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal");
|
||||||
|
tqDebug("vgId:%d scan-wal will start in %dms", vgId, SCAN_WAL_IDLE_DURATION*SCAN_WAL_WAIT_COUNT);
|
||||||
|
|
||||||
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pParam);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
void tqScanWalAsync(STQ* pTq) {
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
tmr_h pTimer = NULL;
|
tmr_h pTimer = NULL;
|
||||||
SBuildScanWalMsgParam* pParam = NULL;
|
SBuildScanWalMsgParam* pParam = NULL;
|
||||||
|
|
||||||
|
// 1. the vnode should be the leader.
|
||||||
|
// 2. the stream isn't disabled
|
||||||
|
if ((pMeta->role == NODE_ROLE_FOLLOWER) || tsDisableStream) {
|
||||||
|
tqInfo("vgId:%d follower node or stream disabled, not scan wal", vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
pParam = taosMemoryMalloc(sizeof(SBuildScanWalMsgParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
return terrno;
|
tqError("vgId:%d failed to start scan wal, stream not executes, code:%s", vgId, tstrerror(code));
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pParam->metaId = pMeta->rid;
|
pParam->metaId = pMeta->rid;
|
||||||
pParam->numOfTasks = numOfTasks;
|
|
||||||
pParam->restored = pTq->pVnode->restored;
|
|
||||||
pParam->msgCb = pTq->pVnode->msgCb;
|
pParam->msgCb = pTq->pVnode->msgCb;
|
||||||
|
|
||||||
code = streamTimerGetInstance(&pTimer);
|
code = streamTimerGetInstance(&pTimer);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
|
tqFatal("vgId:%d failed to get tmr ctrl during sched scan wal", vgId);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
} else {
|
} else {
|
||||||
streamTmrStart(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId, "scan-wal-fut");
|
streamTmrStart(doStartScanWal, SCAN_WAL_IDLE_DURATION, pParam, pTimer, &pMeta->scanInfo.scanTimer, vgId,
|
||||||
|
"scan-wal");
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
|
||||||
bool alreadyRestored = pTq->pVnode->restored;
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
// do not launch the stream tasks, if it is a follower or not restored vnode.
|
|
||||||
if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
code = doScanWalAsync(pTq, ckPause);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStopStreamTasksAsync(STQ* pTq) {
|
int32_t tqStopStreamTasksAsync(STQ* pTq) {
|
||||||
|
@ -347,13 +391,10 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, int32_t* pNumOfTasks) {
|
||||||
int32_t vgId = pStreamMeta->vgId;
|
int32_t vgId = pStreamMeta->vgId;
|
||||||
SArray* pTaskList = NULL;
|
SArray* pTaskList = NULL;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
int32_t numOfTasks = 0;
|
||||||
if (numOfTasks == 0) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// clone the task list, to avoid the task update during scan wal files
|
// clone the task list, to avoid the task update during scan wal files
|
||||||
streamMetaWLock(pStreamMeta);
|
streamMetaWLock(pStreamMeta);
|
||||||
|
@ -364,10 +405,13 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
|
||||||
|
|
||||||
// update the new task number
|
// update the new task number
|
||||||
numOfTasks = taosArrayGetSize(pTaskList);
|
numOfTasks = taosArrayGetSize(pTaskList);
|
||||||
|
if (pNumOfTasks != NULL) {
|
||||||
|
*pNumOfTasks = numOfTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
tqDebug("vgId:%d start to check wal to extract new submit block for %d tasks", vgId, numOfTasks);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
STaskId* pTaskId = taosArrayGet(pTaskList, i);
|
STaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
@ -426,51 +470,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
|
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
|
||||||
bool alreadyRestored = pTq->pVnode->restored;
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
|
|
||||||
if (numOfTasks == 0) {
|
|
||||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMeta->startInfo.startAllTasks) {
|
|
||||||
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMeta->scanInfo.scanCounter += 1;
|
|
||||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
|
||||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pMeta->scanInfo.scanCounter > 1) {
|
|
||||||
tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfPauseTasks = pMeta->numOfPausedTasks;
|
|
||||||
if (ckPause && numOfTasks == numOfPauseTasks) {
|
|
||||||
tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId);
|
|
||||||
|
|
||||||
// reset the counter value, since we do not launch the scan wal operation.
|
|
||||||
pMeta->scanInfo.scanCounter = 0;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId,
|
|
||||||
numOfTasks, alreadyRestored);
|
|
||||||
|
|
||||||
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
|
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
|
||||||
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
|
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
|
||||||
p->metaId = pTq->pStreamMeta->rid;
|
p->metaId = pTq->pStreamMeta->rid;
|
||||||
p->numOfTasks = 0;
|
|
||||||
|
|
||||||
doStartScanWal(p, 0);
|
doStartScanWal(p, 0);
|
||||||
}
|
}
|
|
@ -47,6 +47,10 @@ END:
|
||||||
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
||||||
SSyncState state = syncGetState(pTq->pVnode->sync);
|
SSyncState state = syncGetState(pTq->pVnode->sync);
|
||||||
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
|
streamMetaUpdateStageRole(pTq->pStreamMeta, state.term, isLeader);
|
||||||
|
|
||||||
|
if (isLeader) {
|
||||||
|
tqScanWalAsync(pTq);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
|
|
|
@ -943,10 +943,10 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (scanWal && (vgId != SNODE_HANDLE)) {
|
// if (scanWal && (vgId != SNODE_HANDLE)) {
|
||||||
tqDebug("vgId:%d start scan wal for executing tasks", vgId);
|
// tqDebug("vgId:%d start scan wal for executing tasks", vgId);
|
||||||
code = tqScanWalAsync(pMeta->ahandle, true);
|
// code = tqScanWalAsync(pMeta->ahandle, true);
|
||||||
}
|
// }
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1175,7 +1175,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
||||||
pTask->hTaskInfo.operatorOpen = false;
|
pTask->hTaskInfo.operatorOpen = false;
|
||||||
code = streamStartScanHistoryAsync(pTask, igUntreated);
|
code = streamStartScanHistoryAsync(pTask, igUntreated);
|
||||||
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
} else if (level == TASK_LEVEL__SOURCE && (streamQueueGetNumOfItems(pTask->inputq.queue) == 0)) {
|
||||||
code = tqScanWalAsync((STQ*)handle, false);
|
// code = tqScanWalAsync((STQ*)handle, false);
|
||||||
} else {
|
} else {
|
||||||
code = streamTrySchedExec(pTask);
|
code = streamTrySchedExec(pTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -327,7 +327,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
pMeta->pHbInfo->hbStart = 0;
|
pMeta->pHbInfo->hbStart = 0;
|
||||||
code = taosReleaseRef(streamMetaRefPool, rid);
|
code = taosReleaseRef(streamMetaRefPool, rid);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stDebug("vgId:%d jump out of meta timer", vgId);
|
stInfo("vgId:%d jump out of meta timer since closed", vgId);
|
||||||
} else {
|
} else {
|
||||||
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
||||||
}
|
}
|
||||||
|
@ -341,7 +341,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
|
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
|
||||||
} else {
|
} else {
|
||||||
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
|
stError("vgId:%d role:%d not leader not send hb to mnode, failed to release meta rid:%" PRId64, vgId, role, rid);
|
||||||
}
|
}
|
||||||
// taosMemoryFree(param);
|
// taosMemoryFree(param);
|
||||||
return;
|
return;
|
||||||
|
@ -413,7 +413,7 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
|
||||||
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
|
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
taosMsleep(2 * META_HB_CHECK_INTERVAL);
|
taosMsleep(3 * META_HB_CHECK_INTERVAL);
|
||||||
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -427,7 +427,10 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
||||||
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
|
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
|
||||||
TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
|
TSDB_CHECK_NULL(pMeta->pTaskList, code, lino, _err, terrno);
|
||||||
|
|
||||||
pMeta->scanInfo.scanCounter = 0;
|
pMeta->scanInfo.scanSentinel = 0;
|
||||||
|
pMeta->scanInfo.lastScanTs = 0;
|
||||||
|
pMeta->scanInfo.tickCounter = 0;
|
||||||
|
|
||||||
pMeta->vgId = vgId;
|
pMeta->vgId = vgId;
|
||||||
pMeta->ahandle = ahandle;
|
pMeta->ahandle = ahandle;
|
||||||
pMeta->buildTaskFn = buildTaskFn;
|
pMeta->buildTaskFn = buildTaskFn;
|
||||||
|
@ -1240,8 +1243,8 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
|
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
streamMetaWaitForHbTmrQuit(pMeta);
|
|
||||||
pMeta->closeFlag = true;
|
pMeta->closeFlag = true;
|
||||||
|
streamMetaWaitForHbTmrQuit(pMeta);
|
||||||
|
|
||||||
stDebug("vgId:%d start to check all tasks for closing", vgId);
|
stDebug("vgId:%d start to check all tasks for closing", vgId);
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
@ -1280,6 +1283,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
|
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
|
||||||
|
|
||||||
|
if (pMeta->scanInfo.scanTimer != NULL) {
|
||||||
|
streamTmrStop(pMeta->scanInfo.scanTimer);
|
||||||
|
pMeta->scanInfo.scanTimer = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1347,7 +1356,7 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
|
|
||||||
// mark the sign to send msg before close all tasks
|
// mark the sign to send msg before close all tasks
|
||||||
// 1. for leader vnode, always send msg before closing
|
// 1. for leader vnode, always send msg before closing
|
||||||
// 2. for follower vnode, if it's is changed from leader, also sending msg before closing.
|
// 2. for follower vnode, if it's changed from leader, also sending msg before closing.
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
pMeta->sendMsgBeforeClosing = true;
|
pMeta->sendMsgBeforeClosing = true;
|
||||||
}
|
}
|
||||||
|
@ -1357,11 +1366,11 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
|
|
||||||
if (isLeader) {
|
if (isLeader) {
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
|
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
|
||||||
pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
|
pMeta->vgId, stage, prevStage, isLeader, pMeta->rid);
|
||||||
streamMetaStartHb(pMeta);
|
streamMetaStartHb(pMeta);
|
||||||
} else {
|
} else {
|
||||||
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
|
||||||
prevStage, stage, isLeader, pMeta->sendMsgBeforeClosing);
|
stage, prevStage, isLeader, pMeta->sendMsgBeforeClosing);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,7 @@ sql insert into t1 values(1648791223000,0,1,1,1.0);
|
||||||
sql insert into t1 values(1648791223001,9,2,2,1.1);
|
sql insert into t1 values(1648791223001,9,2,2,1.1);
|
||||||
sql insert into t1 values(1648791223009,0,3,3,1.0);
|
sql insert into t1 values(1648791223009,0,3,3,1.0);
|
||||||
|
|
||||||
sleep 300
|
sleep 1000
|
||||||
|
|
||||||
sql select * from streamt;
|
sql select * from streamt;
|
||||||
if $data01 != 3 then
|
if $data01 != 3 then
|
||||||
|
|
Loading…
Reference in New Issue