Merge pull request #27906 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-09-18 10:27:17 +08:00 committed by GitHub
commit 334b1c55c6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 506 additions and 409 deletions

View File

@ -494,6 +494,14 @@ typedef struct SScanWalInfo {
tmr_h scanTimer;
} SScanWalInfo;
typedef struct SFatalErrInfo {
int32_t code;
int64_t ts;
int32_t threadId;
int32_t line;
char func[128];
} SFatalErrInfo;
// meta
typedef struct SStreamMeta {
char* path;
@ -523,13 +531,12 @@ typedef struct SStreamMeta {
int32_t numOfStreamTasks; // this value should be increased when a new task is added into the meta
int32_t numOfPausedTasks;
int64_t rid;
SFatalErrInfo fatalInfo; // fatal error occurs, stream stop to execute
int64_t chkpId;
int32_t chkpCap;
SArray* chkpSaved;
SArray* chkpInUse;
SRWLatch chkpDirLock;
void* qHandle; // todo remove it
void* bkdChkptMgt;
} SStreamMeta;
@ -776,6 +783,9 @@ void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino);
int32_t streamGetFatalError(const SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pTaskList);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
@ -791,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
// timer
int32_t streamTimerGetInstance(tmr_h* pTmr);
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
void streamTmrStop(tmr_h tmrId);

View File

@ -1143,18 +1143,16 @@ int32_t extractStreamNodeList(SMnode *pMnode) {
return taosArrayGetSize(execInfo.pNodeList);
}
static bool taskNodeIsUpdated(SMnode *pMnode) {
bool allReady = true;
SArray *pNodeSnapshot = NULL;
// check if the node update happens or not
streamMutexLock(&execInfo.lock);
static int32_t doCheckForUpdated(SMnode *pMnode, SArray **ppNodeSnapshot) {
bool allReady = false;
bool nodeUpdated = false;
SVgroupChangeInfo changeInfo = {0};
int32_t numOfNodes = extractStreamNodeList(pMnode);
if (numOfNodes == 0) {
mDebug("stream task node change checking done, no vgroups exist, do nothing");
execInfo.ts = taosGetTimestampSec();
streamMutexUnlock(&execInfo.lock);
return false;
}
@ -1166,43 +1164,46 @@ static bool taskNodeIsUpdated(SMnode *pMnode) {
if (pNodeEntry->stageUpdated) {
mDebug("stream task not ready due to node update detected, checkpoint not issued");
streamMutexUnlock(&execInfo.lock);
return true;
}
}
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, &pNodeSnapshot);
int32_t code = mndTakeVgroupSnapshot(pMnode, &allReady, ppNodeSnapshot);
if (code) {
mError("failed to get the vgroup snapshot, ignore it and continue");
}
if (!allReady) {
mWarn("not all vnodes ready, quit from vnodes status check");
taosArrayDestroy(pNodeSnapshot);
streamMutexUnlock(&execInfo.lock);
return true;
}
SVgroupChangeInfo changeInfo = {0};
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot, &changeInfo);
code = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, *ppNodeSnapshot, &changeInfo);
if (code) {
streamMutexUnlock(&execInfo.lock);
return false;
}
bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
mndDestroyVgroupChangeInfo(&changeInfo);
taosArrayDestroy(pNodeSnapshot);
nodeUpdated = false;
} else {
nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0);
if (nodeUpdated) {
mDebug("stream tasks not ready due to node update");
}
}
streamMutexUnlock(&execInfo.lock);
mndDestroyVgroupChangeInfo(&changeInfo);
return nodeUpdated;
}
// check if the node update happens or not
static bool taskNodeIsUpdated(SMnode *pMnode) {
SArray *pNodeSnapshot = NULL;
streamMutexLock(&execInfo.lock);
bool updated = doCheckForUpdated(pMnode, &pNodeSnapshot);
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pNodeSnapshot);
return updated;
}
static int32_t mndCheckTaskAndNodeStatus(SMnode *pMnode) {
bool ready = true;
if (taskNodeIsUpdated(pMnode)) {
@ -1993,7 +1994,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
if (pInfo->pUpdateNodeList == NULL || pInfo->pDBMap == NULL) {
mndDestroyVgroupChangeInfo(pInfo);
return terrno;
TSDB_CHECK_NULL(NULL, code, lino, _err, terrno);
}
int32_t numOfNodes = taosArrayGetSize(pPrevNodeList);
@ -2048,6 +2049,7 @@ static int32_t mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeLis
return code;
_err:
mError("failed to find node change info, code:%s at %s line:%d", tstrerror(code), __func__, lino);
mndDestroyVgroupChangeInfo(pInfo);
return code;
}

View File

@ -160,6 +160,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tDecoderClear(&decoder);
int32_t gError = streamGetFatalError(pMeta);
if (gError != 0) {
tqError("vgId:%d global fatal occurs, code:%s, ts:%" PRId64 " func:%s", pMeta->vgId, tstrerror(gError),
pMeta->fatalInfo.ts, pMeta->fatalInfo.func);
return 0;
}
// update the nodeEpset when it exists
streamMetaWLock(pMeta);
@ -290,8 +297,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
updateTasks, (numOfTasks - updateTasks));
} else {
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
if ((code = streamMetaCommit(pMeta)) < 0) {
// always return true
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
return TSDB_CODE_SUCCESS;
}
streamMetaClearSetUpdateTaskListComplete(pMeta);
@ -754,8 +764,9 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
}
streamMetaWUnLock(pMeta);
// always return success when handling the requirement issued by mnode during transaction.
return code;
return TSDB_CODE_SUCCESS;
}
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
@ -1197,10 +1208,6 @@ int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { ret
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessConsensusChkptRsp2(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return doProcessDummyRspMsg(pMeta, pMsg);
}
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
@ -1221,14 +1228,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
SStreamTask* pTask = NULL;
SRestoreCheckpointInfo req = {0};
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int64_t now = taosGetTimestampMs();
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
tDecoderClear(&decoder);
@ -1239,8 +1245,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError(
"vgId:%d process set consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
tqError("vgId:%d process consensus checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);
// ignore this code to avoid error code over write
int32_t ret = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
@ -1248,7 +1253,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqError("s-task:0x%x failed add check downstream failed, core:%s", req.taskId, tstrerror(ret));
}
return code;
return 0;
}
// discard the rsp, since it is expired.
@ -1272,7 +1277,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
return 0;
}
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
@ -1299,10 +1304,13 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
if (pMeta->role == NODE_ROLE_LEADER) {
code = tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
if (code) {
tqError("s-task:0x%x vgId:%d failed start task async, code:%s", req.taskId, vgId, tstrerror(code));
}
} else {
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pTask);
return code;
return 0;
}

View File

@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
} else {
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
}
streamMutexUnlock(&pInfo->checkInfoLock);
}
@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) {
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
}
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
streamMutexUnlock(&pInfo->checkInfoLock);

View File

@ -209,29 +209,18 @@ int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStream
return code;
}
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int64_t checkpointId = pDataBlock->info.version;
int32_t transId = pDataBlock->info.window.skey;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
int32_t transId) {
int32_t code = 0;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId > checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -239,9 +228,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
" discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -251,25 +237,24 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
SStreamUpstreamEpInfo* pInfo = NULL;
streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
if (pInfo == NULL) {
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pInfo->epSet, &msg);
if (code) {
stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
}
}
}
stWarn(
"s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the "
"interrupted checkpoint",
"s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
"the interrupted checkpoint",
id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -278,9 +263,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
@ -288,8 +270,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -298,7 +278,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p == NULL) {
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_INVALID_PARA;
}
@ -306,9 +285,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
streamMutexUnlock(&pTask->lock);
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
}
@ -316,7 +292,33 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
}
}
return 0;
}
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int64_t checkpointId = 0;
int32_t transId = 0;
const char* id = pTask->id.idStr;
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
int32_t taskLevel = pTask->info.taskLevel;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
if (pDataBlock == NULL) {
return TSDB_CODE_INVALID_PARA;
}
checkpointId = pDataBlock->info.version;
transId = pDataBlock->info.window.skey;
streamMutexLock(&pTask->lock);
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
streamMutexUnlock(&pTask->lock);
if (code) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
", transId:%d current active checkpointId:%" PRId64,
@ -345,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer);
} else {
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
}
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
@ -367,6 +365,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// before the next checkpoint.
code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
if (code) {
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -675,10 +674,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
}
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
code = streamMetaCommit(pMeta);
return TSDB_CODE_SUCCESS;
}
@ -893,97 +889,56 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
static int32_t doChkptStatusCheck(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pActiveInfo->lock);
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
if (pNotSendList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
streamMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
// streamMutexUnlock(&pTask->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
return 0;
}
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
if (pNotSendList == NULL) {
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
return terrno;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
void* px = taosArrayPush(pNotSendList, pInfo);
if (px == NULL) {
stError("s-task:%s failed to record not send info, code: out of memory", id);
taosArrayDestroy(pNotSendList);
return terrno;
}
}
}
*ppNotSendList = pNotSendList;
return 0;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
streamMutexLock(&pTask->lock);
SStreamTaskState state = streamTaskGetStatus(pTask);
streamMutexUnlock(&pTask->lock);
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id,
vgId, state.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexLock(&pActiveInfo->lock);
int32_t code = doChkptStatusCheck(pTask);
if (code) {
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
SArray* pNotSendList = NULL;
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotSendList);
return;
}
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
if (code) {
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
}
@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms
if (size > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);

View File

@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
int32_t vgId = pTask->pMeta->vgId;
if (pTask->msgInfo.pRetryTmr != NULL) {
streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor-tmr");
} else {
pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
}
streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor");
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
return TSDB_CODE_SUCCESS;
}
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState.name, ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return -1;
}
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref);
return -1;
}
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return 0;
}
static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level,
const char* id) {
SArray* pTmp = taosArrayInit(4, sizeof(int32_t));
if (pTmp == NULL) {
return terrno;
}
for (int32_t i = 0; i < num; ++i) {
@ -971,19 +932,25 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
continue;
}
void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
return terrno;
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
pTask->info.taskLevel, pInfo->upstreamTaskId);
level, pInfo->upstreamTaskId);
}
}
int32_t checkpointId = pActiveInfo->activeId;
*ppNotRspList = pTmp;
return 0;
}
static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) {
int32_t code = 0;
int32_t num = taosArrayGetSize(pReadyList);
const char* id = pTask->id.idStr;
int32_t notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
if (pTaskId == NULL) {
@ -991,7 +958,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
for (int32_t j = 0; j < num; ++j) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j);
if (pReadyInfo == NULL) {
continue;
}
@ -999,7 +966,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
SRpcMsg msg = {0};
int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
pReadyInfo->childId, checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
@ -1015,19 +982,88 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
}
}
}
}
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pNotRspList = NULL;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
return;
}
// reset tmr
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
streamMutexLock(&pTask->lock);
SStreamTaskState state = streamTaskGetStatus(pTask);
streamMutexUnlock(&pTask->lock);
// 1. check status in the first place
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId,
state.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t code = doTaskChkptStatusCheck(pTask, num);
if (code) {
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
int32_t checkpointId = pActiveInfo->activeId;
int32_t notRsp = taosArrayGetSize(pNotRspList);
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
if (notRsp > 0) { // send checkpoint-ready msg again
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
streamMutexUnlock(&pActiveInfo->lock);
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d",
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
"from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pActiveInfo);
streamMutexUnlock(&pActiveInfo->lock);
// release should be the last execution, since pTask may be destroy after it immidiately.
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer);
} else {
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
}
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;

View File

@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
streamMetaRUnLock(pMeta);
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
return terrno;
}
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb");
pInfo->tickCounter = 0;
pInfo->msgSendTs = -1;
pInfo->hbCount = 0;

View File

@ -458,9 +458,6 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
code = createMetaHbInfo(pRid, &pMeta->pHbInfo);
TSDB_CHECK_CODE(code, lino, _err);
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
TSDB_CHECK_NULL(pMeta->qHandle, code, lino, _err, terrno);
code = bkdMgtCreate(tpath, (SBkdMgt**)&pMeta->bkdChkptMgt);
TSDB_CHECK_CODE(code, lino, _err);
@ -629,9 +626,6 @@ void streamMetaCloseImpl(void* arg) {
taosMemoryFree(pMeta->path);
streamMutexDestroy(&pMeta->backendMutex);
taosCleanUpScheduler(pMeta->qHandle);
taosMemoryFree(pMeta->qHandle);
bkdMgtDestroy(pMeta->bkdChkptMgt);
pMeta->role = NODE_ROLE_UNINIT;
@ -929,32 +923,38 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
int32_t code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (code) {
streamSetFatalError(pMeta, code, __func__, __LINE__);
}
streamMetaWUnLock(pMeta);
return code;
}
int32_t streamMetaCommit(SStreamMeta* pMeta) {
int32_t code = 0;
code = tdbCommit(pMeta->db, pMeta->txn);
int32_t code = tdbCommit(pMeta->db, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to commit stream meta", pMeta->vgId);
return code;
streamSetFatalError(pMeta, code, __func__, __LINE__);
stFatal("vgId:%d failed to commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
pMeta->fatalInfo.line);
}
code = tdbPostCommit(pMeta->db, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
streamSetFatalError(pMeta, code, __func__, __LINE__);
stFatal("vgId:%d failed to do post-commit stream meta, code:%s, line:%d", pMeta->vgId, tstrerror(code),
pMeta->fatalInfo.line);
return code;
}
code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (code != 0) {
stError("vgId:%d failed to begin trans", pMeta->vgId);
return code;
streamSetFatalError(pMeta, code, __func__, __LINE__);
stFatal("vgId:%d failed to begin trans, code:%s, line:%d", pMeta->vgId, tstrerror(code), pMeta->fatalInfo.line);
} else {
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
}
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
return code;
}
@ -1261,40 +1261,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
streamMetaHbToMnode(pRid, NULL);
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
int32_t streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta, SArray** pList) {
QRY_PARAM_CHECK(pList);
@ -1398,60 +1364,6 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
return 0;
}
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs;
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
if (code) {
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
}
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
if (code) {
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
}
}
}
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs) {
const char* id = pTask->id.idStr;

View File

@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer =
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
}
@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
// add one ref count for task
streamMetaAcquireOneTask(pTask);
if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer);
} else {
streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer,
&pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr");
}
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr");
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s jump out of schedTimer", id);
stDebug("s-task:%s should stop, jump out of schedTimer", id);
return;
}
@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (code) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
terrno = code;
return;
goto _end;
}
pTrigger->type = STREAM_INPUT__GET_RES;
@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id,
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
return;
goto _end;
}
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
return;
stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code));
goto _end;
}
code = streamTrySchedExec(pTask);
@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
}
}
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
_end:
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
}

View File

@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
}
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;
@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer =
taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, vgId, "history-task");
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
}
@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
streamMetaReleaseTask(pMeta, pTask);
return;
@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
}
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
@ -621,7 +616,7 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
}

View File

@ -505,3 +505,57 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
}
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
streamMetaRUnLock(pMeta);
// add the failed task info, along with the related fill-history task info into tasks list.
code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
if (hasFillhistoryTask) {
code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
}
} else {
streamMetaRUnLock(pMeta);
stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
streamId, taskId, pMeta->vgId);
code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
return code;
}
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
int32_t startTs = pTask->execInfo.checkTs;
int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
if (code) {
stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
}
// automatically set the related fill-history task to be failed.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pId = &pTask->hTaskInfo.id;
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
if (code) {
stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
}
}
}

View File

@ -801,8 +801,8 @@ bool streamTaskSetSchedStatusWait(SStreamTask* pTask) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
ret = true;
}
streamMutexUnlock(&pTask->lock);
streamMutexUnlock(&pTask->lock);
return ret;
}

View File

@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) {
return TSDB_CODE_SUCCESS;
}
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg) {
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) {
if (*pTmrId == NULL) {
*pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle);
if (*pTmrId == NULL) {
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
return;
}
} else {
bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId);
if (ret) {
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
return;
}
}
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
}
void streamTmrStop(tmr_h tmrId) {

View File

@ -35,3 +35,55 @@ void streamMutexDestroy(TdThreadMutex *pMutex) {
stError("%p mutex destroy, code:%s", pMutex, tstrerror(code));
}
}
void streamMetaRLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
int32_t code = taosThreadRwlockRdlock(&pMeta->lock);
if (code) {
stError("vgId:%d meta-rlock failed, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaRUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-runlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d meta-runlock failed, code:%s", pMeta->vgId, tstrerror(code));
} else {
// stTrace("vgId:%d meta-runlock completed", pMeta->vgId);
}
}
void streamMetaWLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wlock", pMeta->vgId);
int32_t code = taosThreadRwlockWrlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamMetaWUnLock(SStreamMeta* pMeta) {
// stTrace("vgId:%d meta-wunlock", pMeta->vgId);
int32_t code = taosThreadRwlockUnlock(&pMeta->lock);
if (code) {
stError("vgId:%d failed to apply wunlock, code:%s", pMeta->vgId, tstrerror(code));
}
}
void streamSetFatalError(SStreamMeta* pMeta, int32_t code, const char* funcName, int32_t lino) {
int32_t oldCode = atomic_val_compare_exchange_32(&pMeta->fatalInfo.code, 0, code);
if (oldCode == 0) {
pMeta->fatalInfo.ts = taosGetTimestampMs();
pMeta->fatalInfo.threadId = taosGetSelfPthreadId();
tstrncpy(pMeta->fatalInfo.func, funcName, tListLen(pMeta->fatalInfo.func));
pMeta->fatalInfo.line = lino;
stInfo("vgId:%d set fatal error, code:%s %s line:%d", pMeta->vgId, tstrerror(code), funcName, lino);
} else {
stFatal("vgId:%d existed fatal error:%s, ts:%" PRId64 " failed to set new fatal error code:%s", pMeta->vgId,
tstrerror(pMeta->fatalInfo.code), pMeta->fatalInfo.ts, tstrerror(code));
}
}
int32_t streamGetFatalError(const SStreamMeta* pMeta) {
return atomic_load_32((volatile int32_t*) &pMeta->fatalInfo.code);
}