refactor: do some internal refactor.
This commit is contained in:
parent
c4a49a7bd9
commit
3332a0b822
|
@ -494,6 +494,14 @@ typedef struct SScanWalInfo {
|
|||
tmr_h scanTimer;
|
||||
} SScanWalInfo;
|
||||
|
||||
typedef struct SFatalErrInfo {
|
||||
int32_t code;
|
||||
int64_t ts;
|
||||
int32_t threadId;
|
||||
int32_t line;
|
||||
char func[128];
|
||||
} SFatalErrInfo;
|
||||
|
||||
// meta
|
||||
typedef struct SStreamMeta {
|
||||
char* path;
|
||||
|
@ -523,14 +531,13 @@ typedef struct SStreamMeta {
|
|||
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
|
||||
int32_t numOfPausedTasks;
|
||||
int64_t rid;
|
||||
|
||||
int64_t chkpId;
|
||||
int32_t chkpCap;
|
||||
SArray* chkpSaved;
|
||||
SArray* chkpInUse;
|
||||
SRWLatch chkpDirLock;
|
||||
void* qHandle; // todo remove it
|
||||
void* bkdChkptMgt;
|
||||
SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute
|
||||
int64_t chkpId;
|
||||
int32_t chkpCap;
|
||||
SArray* chkpSaved;
|
||||
SArray* chkpInUse;
|
||||
SRWLatch chkpDirLock;
|
||||
void* bkdChkptMgt;
|
||||
} SStreamMeta;
|
||||
|
||||
typedef struct STaskUpdateEntry {
|
||||
|
@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta);
|
|||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||
void streamMetaWLock(SStreamMeta* pMeta);
|
||||
void streamMetaWUnLock(SStreamMeta* pMeta);
|
||||
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino);
|
||||
int32_t streamGetFatalError(const SStreamMeta* pMeta);
|
||||
|
||||
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
|
||||
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList);
|
||||
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
|
||||
|
|
|
@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
|
|||
return taosArrayGetSize(execInfo.pNodeList);
|
||||
}
|
||||
|
||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||
bool allReady = true;
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
// check if the node update happens or not
|
||||
streamMutexLock(&execInfo.lock);
|
||||
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
|
||||
bool allReady = false;
|
||||
bool nodeUpdated = false;
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
|
||||
int32_t numOfNodes = extractStreamNodeList(pMnode);
|
||||
|
||||
if (numOfNodes == 0) {
|
||||
mDebug("stream task node change checking done, no vgroups exist, do nothing");
|
||||
execInfo.ts = taosGetTimestampSec();
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
|
|||
|
||||
if (pNodeEntry->stageUpdated) {
|
||||
mDebug("stream task not ready due to node update detected, checkpoint not issued");
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
|
||||
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
|
||||
if (code) {
|
||||
mError("failed to get the vgroup snapshot, ignore it and continue");
|
||||
}
|
||||
|
||||
if (!allReady) {
|
||||
mWarn("not all vnodes ready, quit from vnodes status check");
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return true;
|
||||
}
|
||||
|
||||
SVgroupChangeInfo changeInfo = {0};
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
|
||||
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
|
||||
if (code) {
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return false;
|
||||
nodeUpdated = false;
|
||||
} else {
|
||||
nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
if (nodeUpdated) {
|
||||
mDebug("stream tasks not ready due to node update");
|
||||
}
|
||||
}
|
||||
|
||||
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
|
||||
|
||||
mndDestroyVgroupChangeInfo(&changeInfo);
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
|
||||
if (nodeUpdated) {
|
||||
mDebug("stream tasks not ready due to node update");
|
||||
}
|
||||
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
return nodeUpdated;
|
||||
}
|
||||
|
||||
// check if the node update happens or not
|
||||
static bool taskNodeIsUpdated(SMnode *pMnode) {
|
||||
SArray *pNodeSnapshot = NULL;
|
||||
|
||||
streamMutexLock(&execInfo.lock);
|
||||
bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
|
||||
streamMutexUnlock(&execInfo.lock);
|
||||
|
||||
taosArrayDestroy(pNodeSnapshot);
|
||||
return updated;
|
||||
}
|
||||
|
||||
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
|
||||
bool ready = true;
|
||||
if (taskNodeIsUpdated(pMnode)) {
|
||||
|
@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
|
|||
|
||||
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return terrno;
|
||||
TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
|
||||
}
|
||||
|
||||
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
|
||||
|
@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
|
|||
return code;
|
||||
|
||||
_err:
|
||||
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
|
||||
mndDestroyVgroupChangeInfo(pInfo);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t gError = streamGetFatalError(pMeta);
|
||||
if (gError != 0) {
|
||||
tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
|
||||
pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// update the nodeEpset when it exists
|
||||
streamMetaWLock(pMeta);
|
||||
|
||||
|
@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||
updateTasks, (numOfTasks - updateTasks));
|
||||
} else {
|
||||
if (streamMetaCommit(pMeta) < 0) {
|
||||
// persist to disk
|
||||
if ((code = streamMetaCommit(pMeta)) < 0) {
|
||||
// always return true
|
||||
streamMetaWUnLock(pMeta);
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
streamMetaClearSetUpdateTaskListComplete(pMeta);
|
||||
|
@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
|
|||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
// always return success when handling the requirement issued by mnode during transaction.
|
||||
return code;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||
|
@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret
|
|||
|
||||
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
|
||||
|
||||
int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
return doProcessDummyRspMsg(pMeta, pMsg);
|
||||
}
|
||||
|
||||
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
|
||||
|
||||
|
@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
SStreamTask* pTask = NULL;
|
||||
SRestoreCheckpointInfo req = {0};
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
int64_t now = taosGetTimestampMs();
|
||||
SDecoder decoder;
|
||||
SRestoreCheckpointInfo req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||
|
||||
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
|
||||
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -1239,16 +1245,15 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||
if (pTask == NULL || (code != 0)) {
|
||||
tqError(
|
||||
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
|
||||
pMeta->vgId, req.taskId);
|
||||
// ignore this code to avoid error code over write
|
||||
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
|
||||
if (ret) {
|
||||
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
|
||||
}
|
||||
|
||||
return code;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// discard the rsp, since it is expired.
|
||||
|
@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return TSDB_CODE_STREAM_INTERNAL_ERROR;
|
||||
return 0;
|
||||
}
|
||||
|
||||
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
|
||||
|
@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
|||
|
||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||
code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
|
||||
if (code) {
|
||||
tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
|
||||
}
|
||||
} else {
|
||||
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
return 0;
|
||||
}
|
|
@ -923,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
|||
streamMetaWLock(pMeta);
|
||||
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
if (code) {
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
}
|
||||
streamMetaWUnLock(pMeta);
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
||||
int32_t code = 0;
|
||||
code = tdbCommit(pMeta->db, pMeta->txn);
|
||||
int32_t code = tdbCommit(pMeta->db, pMeta->txn);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to commit stream meta", pMeta->vgId);
|
||||
return code;
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
|
||||
pMeta->fatalInfo.line);
|
||||
}
|
||||
|
||||
code = tdbPostCommit(pMeta->db, pMeta->txn);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
|
||||
pMeta->fatalInfo.line);
|
||||
return code;
|
||||
}
|
||||
|
||||
code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||
if (code != 0) {
|
||||
stError("vgId:%d failed to begin trans", pMeta->vgId);
|
||||
return code;
|
||||
streamSetFatalError(pMeta, code, __func__, __LINE__);
|
||||
stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
|
||||
} else {
|
||||
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
||||
}
|
||||
|
||||
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
|
@ -77,9 +77,10 @@ void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName,
|
|||
pMeta->fatalInfo.threadId = taosGetSelfPthreadId();
|
||||
tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func));
|
||||
pMeta->fatalInfo.line = lino;
|
||||
stInfo("vgId:%d set global fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino);
|
||||
stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino);
|
||||
} else {
|
||||
stFatal("vgId:%d existed global fatal eror:%s, failed to set new fatal error code:%s", pMeta->vgId, code);
|
||||
stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId,
|
||||
pMeta->fatalInfo.ts, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue