refactor:do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-15 16:15:11 +08:00
parent 1d2c00a4ff
commit b4277e0e65
9 changed files with 274 additions and 215 deletions

View File

@ -801,7 +801,7 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
// timer
int32_t streamTimerGetInstance(tmr_h* pTmr);
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg);
void streamTmrStop(tmr_h tmrId);

View File

@ -303,13 +303,8 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref);
if (pInfo->checkRspTmr == NULL) {
pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer);
} else {
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
}
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
streamMutexUnlock(&pInfo->checkInfoLock);
}
@ -860,7 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) {
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
}
streamTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
streamMutexUnlock(&pInfo->checkInfoLock);

View File

@ -347,12 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer);
} else {
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
}
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
@ -893,48 +889,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
static int32_t doChkptStatusCheck(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
@ -942,48 +901,44 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pActiveInfo->lock);
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
if (pNotSendList == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
streamMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
// streamMutexUnlock(&pTask->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
// streamMutexUnlock(&pActiveInfo->lock);
// streamMetaReleaseTask(pTask->pMeta, pTask);
return -1;
}
return 0;
}
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
if (pNotSendList == NULL) {
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
return terrno;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
@ -1007,13 +962,87 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
void* px = taosArrayPush(pNotSendList, pInfo);
if (px == NULL) {
stError("s-task:%s failed to record not send info, code: out of memory", id);
taosArrayDestroy(pNotSendList);
return terrno;
}
}
}
*ppNotSendList = pNotSendList;
return 0;
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
streamMutexLock(&pTask->lock);
SStreamTaskState state = streamTaskGetStatus(pTask);
streamMutexUnlock(&pTask->lock);
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id,
vgId, state.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexLock(&pActiveInfo->lock);
int32_t code = doChkptStatusCheck(pTask);
if (code) {
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// send msg to retrieve checkpoint trigger msg
SArray* pList = pTask->upstreamInfo.pList;
SArray* pNotSendList = NULL;
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotSendList);
return;
}
// do send retrieve checkpoint trigger msg to upstream
int32_t size = taosArrayGetSize(pNotSendList);
int32_t code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
if (code) {
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
}
@ -1023,7 +1052,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms
if (size > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);

View File

@ -634,12 +634,8 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
int32_t vgId = pTask->pMeta->vgId;
if (pTask->msgInfo.pRetryTmr != NULL) {
streamTmrReset(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor-tmr");
} else {
pTask->msgInfo.pRetryTmr = taosTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer);
}
streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor");
}
int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId,
@ -888,77 +884,42 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
return TSDB_CODE_SUCCESS;
}
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
return;
}
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState.name, ref);
streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return -1;
}
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref);
return -1;
}
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
return 0;
}
static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level,
const char* id) {
SArray* pTmp = taosArrayInit(4, sizeof(int32_t));
if (pTmp == NULL) {
return terrno;
}
for (int32_t i = 0; i < num; ++i) {
@ -971,63 +932,138 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
continue;
}
void* p = taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
void* p = taosArrayPush(pTmp, &pInfo->upstreamTaskId);
if (p == NULL) {
stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId);
return terrno;
} else {
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
pTask->info.taskLevel, pInfo->upstreamTaskId);
level, pInfo->upstreamTaskId);
}
}
int32_t checkpointId = pActiveInfo->activeId;
*ppNotRspList = pTmp;
return 0;
}
int32_t notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
if (pTaskId == NULL) {
static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t checkpointId, SArray* pReadyList) {
int32_t code = 0;
int32_t num = taosArrayGetSize(pReadyList);
const char* id = pTask->id.idStr;
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
int32_t* pTaskId = taosArrayGet(pNotRspList, i);
if (pTaskId == NULL) {
continue;
}
for (int32_t j = 0; j < num; ++j) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pReadyList, j);
if (pReadyInfo == NULL) {
continue;
}
for (int32_t j = 0; j < num; ++j) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
if (pReadyInfo == NULL) {
continue;
}
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
if (*pTaskId == pReadyInfo->upstreamTaskId) { // send msg again
SRpcMsg msg = {0};
int32_t code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
pReadyInfo->childId, checkpointId, &msg);
SRpcMsg msg = {0};
code = initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId,
pReadyInfo->childId, checkpointId, &msg);
if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
if (code == TSDB_CODE_SUCCESS) {
code = tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel,
pReadyInfo->upstreamTaskId);
} else {
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
}
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel,
pReadyInfo->upstreamTaskId);
} else {
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
stError("s-task:%s failed to send checkpoint-ready msg, try nex time in 10s", id);
}
} else {
stError("s-task:%s failed to prepare the checkpoint-ready msg, try nex time in 10s", id);
}
}
}
}
}
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pNotRspList = NULL;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
return;
}
// reset tmr
pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
streamMutexLock(&pTask->lock);
SStreamTaskState state = streamTaskGetStatus(pTask);
streamMutexUnlock(&pTask->lock);
// 1. check status in the first place
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId,
state.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
streamMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList);
int32_t code = doTaskChkptStatusCheck(pTask, num);
if (code) {
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
tstrerror(code), ref);
streamMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
int32_t checkpointId = pActiveInfo->activeId;
int32_t notRsp = taosArrayGetSize(pNotRspList);
doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList);
if (notRsp > 0) { // send checkpoint-ready msg again
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
streamMutexUnlock(&pActiveInfo->lock);
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d",
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
"from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pActiveInfo);
streamMutexUnlock(&pActiveInfo->lock);
// release should be the last execution, since pTask may be destroy after it immidiately.
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -1085,12 +1121,8 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pTmrInfo->tmrHandle == NULL) {
pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer);
} else {
streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
}
streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;

View File

@ -279,7 +279,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
@ -301,7 +301,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
}
streamMetaRUnLock(pMeta);
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
@ -317,7 +317,7 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
return terrno;
}
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer, &pInfo->hbTmr, 0, "stream-hb");
pInfo->tickCounter = 0;
pInfo->msgSendTs = -1;
pInfo->hbCount = 0;

View File

@ -20,13 +20,14 @@ static void streamTaskResumeHelper(void* param, void* tmrId);
static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
pTask->schedInfo.pDelayTimer =
taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer);
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
}
@ -76,13 +77,8 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
// add one ref count for task
streamMetaAcquireOneTask(pTask);
if (pTask->schedInfo.pIdleTimer == NULL) {
pTask->schedInfo.pIdleTimer = taosTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer);
} else {
streamTmrReset(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer,
&pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr");
}
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr");
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
@ -125,7 +121,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
stTrace("s-task:%s in scheduler, trigger status:%d, next:%dms", id, status, nextTrigger);
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s jump out of schedTimer", id);
stDebug("s-task:%s should stop, jump out of schedTimer", id);
return;
}
@ -139,9 +135,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (code) {
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
terrno = code;
return;
goto _end;
}
pTrigger->type = STREAM_INPUT__GET_RES;
@ -149,10 +144,9 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (pTrigger->pBlock == NULL) {
taosFreeQitem(pTrigger);
stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory",
stError("s-task:%s failed to build retrieve data trigger, code:out of memory, try again in %dms", id,
nextTrigger);
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
return;
goto _end;
}
atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE);
@ -160,8 +154,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger);
if (code != TSDB_CODE_SUCCESS) {
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
return;
stError("s-task:%s failed to put retrieve block into trigger, code:%s", pTask->id.idStr, tstrerror(code));
goto _end;
}
code = streamTrySchedExec(pTask);
@ -171,5 +165,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
}
}
streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr");
_end:
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
}

View File

@ -80,6 +80,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
}
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t vgId = pTask->pMeta->vgId;
int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
if (numOfTicks <= 0) {
numOfTicks = 1;
@ -100,14 +101,8 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
if (pTask->schedHistoryInfo.pTimer == NULL) {
pTask->schedHistoryInfo.pTimer =
taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
} else {
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, vgId, "history-task");
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
@ -337,7 +332,7 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
}
@ -391,7 +386,7 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
streamTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
streamMetaReleaseTask(pMeta, pTask);
return;
@ -519,7 +514,7 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
}
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
streamTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
}
@ -621,8 +616,8 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
} else {
streamTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
}

View File

@ -491,8 +491,8 @@ static void keepPrevInfo(SStreamTaskSM* pSM) {
int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param) {
SStreamTask* pTask = pSM->pTask;
const char* id = pTask->id.idStr;
int32_t code = 0;
const char* id = pTask->id.idStr;
int32_t code = 0;
// do update the task status
streamMutexLock(&pTask->lock);

View File

@ -40,11 +40,23 @@ int32_t streamTimerGetInstance(tmr_h* pTmr) {
return TSDB_CODE_SUCCESS;
}
void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* handle, tmr_h* pTmrId, int32_t vgId,
void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* pHandle, tmr_h* pTmrId, int32_t vgId,
const char* pMsg) {
bool ret = taosTmrReset(fp, mseconds, param, handle, pTmrId);
if (ret) {
if (*pTmrId == NULL) {
*pTmrId = taosTmrStart(fp, mseconds, pParam, pHandle);
if (*pTmrId == NULL) {
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
return;
}
} else {
bool ret = taosTmrReset(fp, mseconds, pParam, pHandle, pTmrId);
if (ret) {
stError("vgId:%d start %s tmr failed, code:%s", vgId, pMsg, tstrerror(terrno));
return;
}
}
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
}
void streamTmrStop(tmr_h tmrId) {