fix(stream): use the refId in stream meta list, in order to avoid access already freed stream tasks.

This commit is contained in:
Haojun Liao 2024-10-27 15:49:40 +08:00
parent 380f433499
commit 4802f59dfe
16 changed files with 769 additions and 644 deletions

View File

@ -45,17 +45,17 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
char *p = streamTaskGetStatus(pTask).name;
if (pTask->info.fillHistory) {
sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam);
} else {
sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
sndInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
SNODE_HANDLE, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam);
}

View File

@ -238,13 +238,18 @@ int32_t tdFetchTbUidList(SSma *pSma, STbUidStore **ppStore, tb_uid_t suid, tb_ui
}
static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTaskId *pId) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask *pTask = NULL;
streamMetaRLock(pMeta);
SStreamTask **ppTask = (SStreamTask **)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask && *ppTask) {
pItem->submitReqVer = (*ppTask)->chkInfo.checkpointVer;
pItem->fetchResultVer = (*ppTask)->info.delaySchedParam;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
pItem->submitReqVer = pTask->chkInfo.checkpointVer;
pItem->fetchResultVer = pTask->info.delaySchedParam;
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaRUnLock(pMeta);
}

View File

@ -774,19 +774,19 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
if (pTask->info.fillHistory) {
tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
"delaySched:%" PRId64 " ms, inputVer:%" PRId64,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam, nextProcessVer);
} else {
tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
tqInfo("vgId:%d build stream task, s-task:%s, %p checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x "
"delaySched:%" PRId64 " ms, inputVer:%" PRId64,
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
vgId, pTask->id.idStr, pTask, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory,
(int32_t)pTask->hTaskInfo.id.taskId, pTask->info.delaySchedParam, nextProcessVer);

View File

@ -1115,13 +1115,18 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);
continue;
int64_t refId = *(int64_t*)pIter;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, refId);
if (pTask != NULL) {
int32_t taskId = pTask->id.taskId;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
}
}
taosReleaseRef(streamTaskRefPool, refId);
}
}

View File

@ -79,7 +79,7 @@ static void doStartScanWal(void* param, void* tmrId) {
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, pParam->metaId);
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId);
if (pMeta == NULL) {
tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId);
taosMemoryFree(pParam);
@ -97,7 +97,7 @@ static void doStartScanWal(void* param, void* tmrId) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
code = taosReleaseRef(streamMetaId, pParam->metaId);
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));

View File

@ -683,19 +683,21 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
continue;
}
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
tqError("vgId:%d failed to acquire task:0x%x in retrieving progress", pMeta->vgId, pId->taskId);
continue;
}
if ((*ppTask)->info.taskLevel != TASK_LEVEL__SOURCE) {
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// here we get the required stream source task
SStreamTask* pTask = *ppTask;
*fhFinished = !HAS_RELATED_FILLHISTORY_TASK(pTask);
int64_t ver = walReaderGetCurrentVer(pTask->exec.pWalReader);
@ -711,6 +713,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
SWalReader* pReader = walOpenReader(pTask->exec.pWalReader->pWal, NULL, 0);
if (pReader == NULL) {
tqError("failed to open wal reader to extract exec progress, vgId:%d", pMeta->vgId);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
@ -736,6 +739,7 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
}
walCloseReader(pReader);
streamMetaReleaseTask(pMeta, pTask);
}
streamMetaRUnLock(pMeta);

View File

@ -138,13 +138,15 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
// this is to process request from transaction, always return true.
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
bool updated = false;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
bool updated = false;
int32_t code = 0;
SStreamTask* pTask = NULL;
SStreamTask* pHTask = NULL;
SStreamTaskNodeUpdateMsg req = {0};
@ -170,9 +172,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaWLock(pMeta);
// the task epset may be updated again and again, when replaying the WAL, the task may be in stop status.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) {
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId,
req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
@ -181,12 +183,13 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code;
}
SStreamTask* pTask = *ppTask;
const char* idstr = pTask->id.idStr;
const char* idstr = pTask->id.idStr;
if (req.transId <= 0) {
tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
rsp.code = TSDB_CODE_SUCCESS;
streamMetaReleaseTask(pMeta, pTask);
streamMetaWUnLock(pMeta);
taosArrayDestroy(req.pNodeList);
@ -227,24 +230,23 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
SStreamTask** ppHTask = NULL;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
if (ppHTask == NULL || *ppHTask == NULL) {
code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHTask);
if (code != 0) {
tqError(
"vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel "
"stream task:0x%x",
vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
} else {
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
bool updateEpSet = streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", pHTask->id.idStr);
bool updateEpSet = streamTaskUpdateEpsetInfo(pHTask, req.pNodeList);
if (updateEpSet) {
updated = updateEpSet;
}
streamTaskResetStatus(*ppHTask);
streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
streamTaskResetStatus(pHTask);
streamTaskStopMonitorCheckRsp(&pHTask->taskCheckInfo, pHTask->id.idStr);
}
}
@ -256,8 +258,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqError("s-task:%s vgId:%d failed to save task, code:%s", idstr, vgId, tstrerror(code));
}
if (ppHTask != NULL) {
code = streamMetaSaveTask(pMeta, *ppHTask);
if (pHTask != NULL) {
code = streamMetaSaveTask(pMeta, pHTask);
if (code) {
tqError("s-task:%s vgId:%d failed to save related history task, code:%s", idstr, vgId, tstrerror(code));
}
@ -271,15 +273,17 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqError("s-task:%s vgId:%d failed to stop task, code:%s", idstr, vgId, tstrerror(code));
}
if (ppHTask != NULL) {
code = streamTaskStop(*ppHTask);
if (pHTask != NULL) {
code = streamTaskStop(pHTask);
if (code) {
tqError("s-task:%s vgId:%d failed to stop related history task, code:%s", idstr, vgId, tstrerror(code));
}
}
// keep info
streamMetaAddIntoUpdateTaskList(pMeta, pTask, (ppHTask != NULL) ? (*ppHTask) : NULL, req.transId, st);
streamMetaAddIntoUpdateTaskList(pMeta, pTask, (pHTask != NULL) ? (pHTask) : NULL, req.transId, st);
streamMetaReleaseTask(pMeta, pTask);
streamMetaReleaseTask(pMeta, pHTask);
rsp.code = TSDB_CODE_SUCCESS;
@ -643,7 +647,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
tstrerror(code));
tFreeStreamTask(pTask);
return code;
}
@ -673,7 +676,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
}
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
tFreeStreamTask(pTask);
}
return code;
@ -681,25 +683,25 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
STaskId hTaskId = {0};
SStreamTask* pTask = NULL;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
STaskId hTaskId = {0};
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
streamMetaWLock(pMeta);
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
SStreamTask* pTask = *ppTask;
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
hTaskId.streamId = pTask->hTaskInfo.id.streamId;
hTaskId.taskId = pTask->hTaskInfo.id.taskId;
}
// clear the relationship, and then release the stream tasks, to avoid invalid accessing of already freed
// related stream(history) task
streamTaskSetRemoveBackendFiles(pTask);
code = streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt);
streamMetaReleaseTask(pMeta, pTask);
@ -742,18 +744,19 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg) {
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
SStreamTask* pTask = NULL;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
streamMetaWLock(pMeta);
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL && (*ppTask) != NULL) {
code = streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
code = streamTaskUpdateTaskCheckpointInfo(pTask, restored, pReq);
streamMetaReleaseTask(pMeta, pTask);
} else { // failed to get the task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqError(
@ -763,7 +766,6 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
}
streamMetaWUnLock(pMeta);
// always return success when handling the requirement issued by mnode during transaction.
return TSDB_CODE_SUCCESS;
}
@ -789,11 +791,6 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
while (streamMetaTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaWLock(pMeta);
streamMetaClear(pMeta);

View File

@ -299,13 +299,14 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
return;
}
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start check-rsp monitor, ref:%d ", pTask->id.idStr, ref);
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
int64_t* pTaskRefId = NULL;
code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTaskRefId, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
}
streamMutexUnlock(&pInfo->checkInfoLock);
}
@ -721,21 +722,45 @@ int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId, int64
return streamTaskSchedTask(pMsgCb, vgId, streamId, taskId, STREAM_EXEC_T_ADD_FAILED_TASK);
}
static void doCleanup(SStreamTask* pTask, SArray* pNotReadyList, SArray* pTimeoutList, void* param) {
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
streamTaskFreeRefId(param);
}
// this function is executed in timer thread
void rspMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
SStreamMeta* pMeta = pTask->pMeta;
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
int32_t total = taosArrayGetSize(pInfo->pList);
int64_t taskRefId = *(int64_t*)param;
int64_t now = taosGetTimestampMs();
SArray* pNotReadyList = NULL;
SArray* pTimeoutList = NULL;
SStreamMeta* pMeta = NULL;
STaskCheckInfo* pInfo = NULL;
int32_t vgId = -1;
int64_t timeoutDuration = 0;
const char* id = NULL;
int32_t total = 0;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
pMeta = pTask->pMeta;
pInfo = &pTask->taskCheckInfo;
vgId = pTask->pMeta->vgId;
timeoutDuration = now - pInfo->timeoutStartTs;
id = pTask->id.idStr;
total = (int32_t) taosArrayGetSize(pInfo->pList);
stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
@ -744,12 +769,10 @@ void rspMonitorFn(void* param, void* tmrId) {
streamMutexUnlock(&pTask->lock);
if (state.state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
streamTaskCompleteCheckRsp(pInfo, true, id);
// not record the failed of the current task if try to close current vnode
// not record the failure of the current task if try to close current vnode
// otherwise, the put of message operation may incur invalid read of message queue.
if (!pMeta->closeFlag) {
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
@ -758,33 +781,30 @@ void rspMonitorFn(void* param, void* tmrId) {
}
}
streamMetaReleaseTask(pMeta, pTask);
doCleanup(pTask, pNotReadyList, pTimeoutList, param);
return;
}
if (state.state == TASK_STATUS__DROPPING || state.state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, state.name, vgId, ref);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr", id, state.name, vgId);
streamTaskCompleteCheckRsp(pInfo, true, id);
streamMetaReleaseTask(pMeta, pTask);
doCleanup(pTask, pNotReadyList, pTimeoutList, param);
return;
}
streamMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, state.name, vgId,
ref);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr", id, state.name, vgId);
streamTaskCompleteCheckRsp(pInfo, false, id);
streamMutexUnlock(&pInfo->checkInfoLock);
streamMetaReleaseTask(pMeta, pTask);
doCleanup(pTask, pNotReadyList, pTimeoutList, param);
return;
}
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
pNotReadyList = taosArrayInit(4, sizeof(int64_t));
pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (state.state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
@ -795,31 +815,25 @@ void rspMonitorFn(void* param, void* tmrId) {
// fault tasks detected, not try anymore
bool jumpOut = false;
if ((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) != total) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError(
"s-task:%s vgId:%d internal error in handling the check downstream procedure, rsp number is inconsistent, "
"stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
"stop rspMonitor tmr, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
jumpOut = true;
}
if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
"detected, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
jumpOut = true;
}
if (jumpOut) {
streamTaskCompleteCheckRsp(pInfo, false, id);
streamMutexUnlock(&pInfo->checkInfoLock);
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
doCleanup(pTask, pNotReadyList, pTimeoutList, param);
return;
}
} else { // unexpected status
@ -828,11 +842,10 @@ void rspMonitorFn(void* param, void* tmrId) {
// checking of downstream tasks has been stopped by other threads
if (pInfo->stopCheckProcess == 1) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, total:%d, notRsp:%d, "
"notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
"notReady:%d, fault:%d, timeout:%d, ready:%d",
id, state.name, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
streamTaskCompleteCheckRsp(pInfo, false, id);
streamMutexUnlock(&pInfo->checkInfoLock);
@ -842,10 +855,7 @@ void rspMonitorFn(void* param, void* tmrId) {
stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
}
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
doCleanup(pTask, pNotReadyList, pTimeoutList, param);
return;
}
@ -857,7 +867,7 @@ void rspMonitorFn(void* param, void* tmrId) {
handleTimeoutDownstreamTasks(pTask, pTimeoutList);
}
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr, vgId,
streamTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, param, streamTimer, &pInfo->checkRspTmr, vgId,
"check-status-monitor");
streamMutexUnlock(&pInfo->checkInfoLock);
@ -865,7 +875,5 @@ void rspMonitorFn(void* param, void* tmrId) {
"s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
"ready:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
doCleanup(pTask, pNotReadyList, pTimeoutList, NULL);
}

View File

@ -345,13 +345,15 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;
int64_t* pTaskRefId = NULL;
code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId;
}
} else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
}
@ -890,7 +892,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
return code;
}
static int32_t doChkptStatusCheck(SStreamTask* pTask) {
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
@ -898,25 +900,24 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) {
// checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
return -1;
}
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
", quit",
id, vgId, pTmrInfo->launchChkptId);
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
vgId);
return -1;
}
@ -964,22 +965,22 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray**
return 0;
}
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSendList) {
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) {
const char* id = pTask->id.idStr;
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int32_t vgId = pTask->pMeta->vgId;
int32_t code = doChkptStatusCheck(pTask);
int32_t code = doChkptStatusCheck(pTask, param);
if (code) {
return code;
}
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr, ref:%d", id, tstrerror(code), ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
return code;
}
@ -993,36 +994,48 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, SArray* pNotSen
return code;
}
static void doCleanup(SStreamTask* pTask, SArray* pList) {
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pList);
}
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SArray* pNotSendList = NULL;
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
int32_t code = 0;
int32_t numOfNotSend = 0;
SArray* pNotSendList = NULL;
int64_t taskRefId = *(int64_t*)param;
int64_t now = taosGetTimestampMs();
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, ref:%d quit", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
doCleanup(pTask, pNotSendList);
return;
}
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
doCleanup(pTask, pNotSendList);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
return;
}
@ -1035,20 +1048,19 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
streamMutexUnlock(&pTask->lock);
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id,
vgId, state.name, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
vgId, state.name);
doCleanup(pTask, pNotSendList);
return;
}
streamMutexLock(&pActiveInfo->lock);
code = chkptTriggerRecvMonitorHelper(pTask, pNotSendList);
code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList);
streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotSendList);
doCleanup(pTask, pNotSendList);
return;
}
@ -1056,15 +1068,14 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
numOfNotSend = taosArrayGetSize(pNotSendList);
if (numOfNotSend > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor");
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
}
taosArrayDestroy(pNotSendList);
doCleanup(pTask, pNotSendList);
}
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {

View File

@ -518,45 +518,66 @@ static void doSendFailedDispatch(SStreamTask* pTask, SDispatchEntry* pEntry, int
}
}
static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) {
int32_t ret = taosReleaseRef(streamTaskRefPool, taskRefId);
if (ret) {
stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, taskRefId);
}
streamTaskFreeRefId(param);
}
static void doMonitorDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int32_t code = 0;
int64_t now = taosGetTimestampMs();
bool inDispatch = true;
SStreamTask* pTask = NULL;
int64_t taskRefId = *(int64_t*)param;
const char* id = NULL;
int32_t vgId = -1;
SDispatchMsgInfo* pMsgInfo = NULL;
int32_t msgId = -1;
stDebug("s-task:%s start monitor dispatch data", id);
pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
id = pTask->id.idStr;
vgId = pTask->pMeta->vgId;
pMsgInfo = &pTask->msgInfo;
msgId = pMsgInfo->msgId;
stDebug("s-task:%s start to monitor dispatch data", id);
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
setNotInDispatchMonitor(pMsgInfo);
cleanupInMonitor(pTask->id.taskId, taskRefId, param);
return;
}
// slave task not handle the dispatch, downstream not ready will break the monitor timer
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr, ref:%d", id, vgId, ref);
stError("s-task:%s vgId:%d follower or downstream not ready, jump out of monitor tmr", id, vgId);
setNotInDispatchMonitor(pMsgInfo);
cleanupInMonitor(pTask->id.taskId, taskRefId, param);
return;
}
streamMutexLock(&pMsgInfo->lock);
if (pTask->outputq.status == TASK_OUTPUT_STATUS__NORMAL) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
stDebug("s-task:%s not in dispatch procedure, abort from timer", pTask->id.idStr);
pMsgInfo->inMonitor = 0;
inDispatch = false;
}
streamMutexUnlock(&pMsgInfo->lock);
if (!inDispatch) {
cleanupInMonitor(pTask->id.taskId, taskRefId, param);
return;
}
@ -564,6 +585,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
if (numOfFailed == 0) {
stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS);
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
cleanupInMonitor(pTask->id.taskId, taskRefId, param);
return;
}
@ -628,18 +650,23 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
}
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s should stop, abort from timer, ref:%d", pTask->id.idStr, ref);
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
setNotInDispatchMonitor(pMsgInfo);
} else {
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
}
cleanupInMonitor(pTask->id.taskId, taskRefId, param);
}
void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) {
int32_t vgId = pTask->pMeta->vgId;
streamTmrStart(doMonitorDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor");
int32_t vgId = pTask->pMeta->vgId;
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(doMonitorDispatchData, waitDuration, pTaskRefId, streamTimer, &pTask->msgInfo.pRetryTmr, vgId,
"dispatch-monitor");
}
}
static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock,
@ -854,9 +881,9 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
} else {
streamMutexLock(&pTask->msgInfo.lock);
if (pTask->msgInfo.inMonitor == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start dispatch monitor tmr in %dms, ref:%d, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
ref, tstrerror(code));
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
tstrerror(code));
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
pTask->msgInfo.inMonitor = 1;
} else {
@ -911,31 +938,31 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32
return TSDB_CODE_SUCCESS;
}
static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) {
static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, void* param, int32_t num) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
", quit",
id, vgId, pTmrInfo->launchChkptId);
return -1;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr", id, vgId);
return -1;
}
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr, ref:%d", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num, ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stWarn("s-task:%s vgId:%d upstream number:%d not equals sent readyMsg:%d, quit from readyMsg send tmr", id,
vgId, (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), num);
return -1;
}
@ -1011,7 +1038,7 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t
}
}
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) {
static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, void* param, SArray* pNotRspList) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pList = pActiveInfo->pReadyMsgList;
@ -1021,16 +1048,15 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList)
const char* id = pTask->id.idStr;
int32_t notRsp = 0;
int32_t code = doTaskChkptStatusCheck(pTask, num);
int32_t code = doTaskChkptStatusCheck(pTask, param, num);
if (code) {
return code;
}
code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id);
if (code) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id,
tstrerror(code), ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr", id, tstrerror(code));
return code;
}
@ -1045,26 +1071,41 @@ static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList)
}
static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
SArray* pNotRspList = NULL;
int32_t code = 0;
int32_t notRsp = 0;
int64_t taskRefId = *(int64_t*)param;
int32_t vgId = -1;
const char* id = NULL;
SActiveCheckpointInfo* pActiveInfo = NULL;
SStreamTmrInfo* pTmrInfo = NULL;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
vgId = pTask->pMeta->vgId;
id = pTask->id.idStr;
pActiveInfo = pTask->chkInfo.pActiveInfo;
pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger", id, vgId);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
if (++pTmrInfo->activeCounter < 50) {
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
@ -1078,15 +1119,16 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
// 1. check status in the first place
if (state.state != TASK_STATUS__CK) {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready, ref:%d", id, vgId,
state.name, ref);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready", id, vgId,
state.name);
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
return;
}
streamMutexLock(&pActiveInfo->lock);
code = chkptReadyMsgSendHelper(pTask, pNotRspList);
code = chkptReadyMsgSendHelper(pTask, param, pNotRspList);
streamMutexUnlock(&pActiveInfo->lock);
if (code != TSDB_CODE_SUCCESS) {
@ -1098,18 +1140,18 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) {
notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
} else {
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
streamCleanBeforeQuitTmr(pTmrInfo, param);
stDebug(
"s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit "
"from timer, ref:%d",
id, vgId, ref);
// release should be the last execution, since pTask may be destroy after it immidiately.
streamMetaReleaseTask(pTask->pMeta, pTask);
"from timer",
id, vgId);
}
// release should be the last execution, since pTask may be destroyed after it immediately.
streamMetaReleaseTask(pTask->pMeta, pTask);
taosArrayDestroy(pNotRspList);
}
@ -1160,15 +1202,17 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
stDebug("s-task:%s start checkpoint-ready monitor in 10s", pTask->id.idStr);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor");
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;
}
} else {
stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr);
}

View File

@ -21,7 +21,7 @@
#include "ttimer.h"
#include "wal.h"
int32_t streamMetaId = 0;
int32_t streamMetaRefPool = 0;
struct SMetaHbInfo {
tmr_h hbTmr;
@ -123,17 +123,21 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
for(int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
if ((*pTask)->info.fillHistory == 1) {
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
epsetAssign(&epset, &pTask->info.mnodeEpset);
streamMetaReleaseTask(pMeta, pTask);
break;
}
@ -159,28 +163,30 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code != 0) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
if (pTask->info.fillHistory == 1) {
streamMetaReleaseTask(pMeta, pTask);
continue;
}
streamMutexLock(&(*pTask)->lock);
STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask);
streamMutexUnlock(&(*pTask)->lock);
streamMutexLock(&pTask->lock);
STaskStatusEntry entry = streamTaskGetStatusEntry(pTask);
streamMutexUnlock(&pTask->lock);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = pTask->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB(pTask->execInfo.sink.dataSize);
}
SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
SActiveCheckpointInfo* p = pTask->chkInfo.pActiveInfo;
if (p->activeId != 0) {
entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = p->activeId;
@ -188,40 +194,42 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
pTask->id.idStr, p->transId);
streamMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
streamMutexUnlock(&(*pTask)->lock);
streamMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, true);
streamMutexUnlock(&pTask->lock);
}
}
streamMutexLock(&(*pTask)->lock);
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts);
streamMutexLock(&pTask->lock);
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(pTask, pMsg->ts);
if (entry.checkpointInfo.consensusChkptId) {
entry.checkpointInfo.consensusTs = pMsg->ts;
}
streamMutexUnlock(&(*pTask)->lock);
streamMutexUnlock(&pTask->lock);
if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (pTask->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer(pTask->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
entry.processedVer = pTask->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
walReaderValidVersionRange(pTask->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, pMsg);
addUpdateNodeIntoHbMsg(pTask, pMsg);
p = taosArrayPush(pMsg->pTaskStatus, &entry);
if (p == NULL) {
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", (*pTask)->id.taskId, pMeta->vgId);
stError("failed to add taskInfo:0x%x in hbMsg, vgId:%d", pTask->id.taskId, pMeta->vgId);
}
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
epsetAssign(&epset, &pTask->info.mnodeEpset);
hasMnodeEpset = true;
}
streamMetaReleaseTask(pMeta, pTask);
}
pMsg->numOfTasks = taosArrayGetSize(pMsg->pTaskStatus);
@ -244,9 +252,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
int32_t vgId = 0;
int32_t role = 0;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, rid);
if (pMeta == NULL) {
stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid);
stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", rid);
// taosMemoryFree(param);
return;
}
@ -256,24 +265,26 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
// need to stop, stop now
if (pMeta->closeFlag) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaRefPool, rid);
if (code == TSDB_CODE_SUCCESS) {
stDebug("vgId:%d jump out of meta timer", vgId);
} else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
// taosMemoryFree(param);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaRefPool, rid);
if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
} else {
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
}
// taosMemoryFree(param);
return;
}
@ -281,7 +292,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaRefPool, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
@ -298,12 +309,13 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
if (code) {
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
}
streamMetaRUnLock(pMeta);
streamTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaRefPool, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamBackendRocksdb.h"
#include "streamInt.h"
#include "tmisce.h"
@ -28,6 +27,7 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t taskDbWrapperId = 0;
int32_t streamTaskRefPool = 0;
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
@ -41,14 +41,14 @@ SMetaRefMgt gMetaRefMgt;
int32_t metaRefMgtInit();
void metaRefMgtCleanup();
int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid);
static void streamMetaEnvInit() {
streamBackendId = taosOpenRef(64, streamBackendCleanup);
streamBackendCfWrapperId = taosOpenRef(64, streamBackendHandleCleanup);
taskDbWrapperId = taosOpenRef(64, taskDbDestroy2);
streamMetaId = taosOpenRef(64, streamMetaCloseImpl);
streamMetaRefPool = taosOpenRef(64, streamMetaCloseImpl);
streamTaskRefPool = taosOpenRef(64, tFreeStreamTask);
int32_t code = metaRefMgtInit();
if (code) {
@ -72,7 +72,8 @@ void streamMetaInit() {
void streamMetaCleanup() {
taosCloseRef(streamBackendId);
taosCloseRef(streamBackendCfWrapperId);
taosCloseRef(streamMetaId);
taosCloseRef(streamMetaRefPool);
taosCloseRef(streamTaskRefPool);
metaRefMgtCleanup();
streamTimerCleanUp();
@ -98,16 +99,21 @@ int32_t metaRefMgtInit() {
void metaRefMgtCleanup() {
void* pIter = taosHashIterate(gMetaRefMgt.pTable, NULL);
while (pIter) {
SArray* list = *(SArray**)pIter;
for (int i = 0; i < taosArrayGetSize(list); i++) {
void* rid = taosArrayGetP(list, i);
taosMemoryFree(rid);
}
taosArrayDestroy(list);
int64_t* p = *(int64_t**) pIter;
stInfo("---------------free refId:%"PRId64", %p", *p, p);
taosMemoryFree(p);
// SArray* list = *(SArray**)pIter;
// for (int i = 0; i < taosArrayGetSize(list); i++) {
// void* rid = taosArrayGetP(list, i);
// taosMemoryFree(rid);
// }
// taosArrayDestroy(list);
pIter = taosHashIterate(gMetaRefMgt.pTable, pIter);
}
taosHashCleanup(gMetaRefMgt.pTable);
taosHashCleanup(gMetaRefMgt.pTable);
streamMutexDestroy(&gMetaRefMgt.mutex);
}
@ -117,35 +123,31 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
streamMutexLock(&gMetaRefMgt.mutex);
p = taosHashGet(gMetaRefMgt.pTable, &vgId, sizeof(vgId));
p = taosHashGet(gMetaRefMgt.pTable, &rid, sizeof(rid));
if (p == NULL) {
SArray* pList = taosArrayInit(8, POINTER_BYTES);
if (pList == NULL) {
return terrno;
}
p = taosArrayPush(pList, &rid);
if (p == NULL) {
return terrno;
}
code = taosHashPut(gMetaRefMgt.pTable, &vgId, sizeof(vgId), &pList, sizeof(void*));
code = taosHashPut(gMetaRefMgt.pTable, &rid, sizeof(rid), &rid, sizeof(void*));
if (code) {
stError("vgId:%d failed to put into metaRef table, rid:%" PRId64, (int32_t)vgId, *rid);
return code;
} else {
stInfo("add refId:%"PRId64" vgId:%d, %p", *rid, (int32_t)vgId, rid);
}
} else {
SArray* list = *(SArray**)p;
void* px = taosArrayPush(list, &rid);
if (px == NULL) {
code = terrno;
}
// todo
}
streamMutexUnlock(&gMetaRefMgt.mutex);
return code;
}
void metaRefMgtRemove(int64_t* pRefId) {
streamMutexLock(&gMetaRefMgt.mutex);
taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
stInfo("remove refId from mgt, refId:%"PRId64", %p", *pRefId, pRefId);
streamMutexUnlock(&gMetaRefMgt.mutex);
}
int32_t streamMetaOpenTdb(SStreamMeta* pMeta) {
if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db, 0, 0, NULL) < 0) {
stError("vgId:%d open file:%s failed, stream meta open failed", pMeta->vgId, pMeta->path);
@ -434,7 +436,7 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
pMeta->closeFlag = false;
stInfo("vgId:%d open stream meta succ, latest checkpoint:%" PRId64 ", stage:%" PRId64, vgId, pMeta->chkpId, stage);
pMeta->rid = taosAddRef(streamMetaId, pMeta);
pMeta->rid = taosAddRef(streamMetaRefPool, pMeta);
// set the attribute when running on Linux OS
TdThreadRwlockAttr attr;
@ -527,17 +529,20 @@ void streamMetaClear(SStreamMeta* pMeta) {
// remove all existed tasks in this vnode
void* pIter = NULL;
while ((pIter = taosHashIterate(pMeta->pTasksMap, pIter)) != NULL) {
SStreamTask* p = *(SStreamTask**)pIter;
int64_t refId = *(int64_t*)pIter;
SStreamTask* p = taosAcquireRef(streamTaskRefPool, refId);
if (p == NULL) {
continue;
}
// release the ref by timer
if (p->info.delaySchedParam != 0 && p->info.fillHistory == 0) { // one more ref in timer
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", p->id.idStr, p->refCnt);
streamTmrStop(p->schedInfo.pDelayTimer);
p->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, p);
}
streamMetaReleaseTask(pMeta, p);
taosRemoveRef(streamTaskRefPool, refId);
}
if (pMeta->streamBackendRid != 0) {
@ -567,9 +572,9 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) {
return;
}
int32_t code = taosRemoveRef(streamMetaId, pMeta->rid);
int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
if (code) {
stError("vgId:%d failed to remove ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
}
}
@ -656,9 +661,16 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
if (code != TSDB_CODE_SUCCESS) {
code = terrno;
stError("s-task:%s vgId:%d task meta save to disk failed, code:%s", pTask->id.idStr, vgId, tstrerror(terrno));
stError("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk failed, remove ref, code:%s", pTask->id.idStr,
vgId, pTask->id.refId, tstrerror(code));
int64_t refId = pTask->id.refId;
int32_t ret = taosRemoveRef(streamTaskRefPool, pTask->id.refId);
if (ret != 0) {
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id[1], refId);
}
} else {
stDebug("s-task:%s vgId:%d task meta save to disk", pTask->id.idStr, vgId);
stDebug("s-task:%s vgId:%d refId:%" PRId64 " task meta save to disk", pTask->id.idStr, vgId, pTask->id.refId);
}
taosMemoryFree(buf);
@ -683,34 +695,47 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
*pAdded = false;
int32_t code = 0;
int64_t refId = 0;
STaskId id = streamTaskGetTaskId(pTask);
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p != NULL) {
stDebug("s-task:%" PRIx64 " already exist in meta, no need to register", id.taskId);
tFreeStreamTask(pTask);
return code;
}
if ((code = pMeta->buildTaskFn(pMeta->ahandle, pTask, ver)) != 0) {
tFreeStreamTask(pTask);
return code;
}
p = taosArrayPush(pMeta->pTaskList, &pTask->id);
if (p == NULL) {
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
tFreeStreamTask(pTask);
return terrno;
}
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES);
pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
if (code) {
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret != 0) {
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
}
return code;
}
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
taosRemoveRef(streamTaskRefPool, refId);
return code;
}
if ((code = streamMetaCommit(pMeta)) != 0) {
taosRemoveRef(streamTaskRefPool, refId);
return code;
}
@ -733,16 +758,72 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta) {
}
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask) {
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || streamTaskShouldStop(*ppTask)) {
*pTask = NULL;
QRY_PARAM_CHECK(pTask);
STaskId id = {.streamId = streamId, .taskId = taskId};
int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTaskRefId == NULL) {
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
int32_t ref = atomic_add_fetch_32(&(*ppTask)->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", (*ppTask)->id.idStr, ref);
*pTask = *ppTask;
SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
if (p == NULL) {
stDebug("s-task:%x failed to acquire task refId:%"PRId64", may have been destoried", taskId, *pTaskRefId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
if (p->id.refId != *pTaskRefId) {
stFatal("s-task:%x inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, taskId, *pTaskRefId,
p->id.refId);
int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
if (ret) {
stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
}
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
if (streamTaskShouldStop(p)) {
stDebug("s-task:%s is stopped, failed to acquire it now", p->id.idStr);
int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
if (ret) {
stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, *pTaskRefId);
}
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
*pTask = p;
return TSDB_CODE_SUCCESS;
}
int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask) {
QRY_PARAM_CHECK(pTask);
int64_t* pTaskRefId = (int64_t*)taosHashGet(pMeta->pTasksMap, pId, sizeof(*pId));
if (pTaskRefId == NULL) {
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
SStreamTask* p = taosAcquireRef(streamTaskRefPool, *pTaskRefId);
if (p == NULL) {
stDebug("s-task:%" PRIx64 " failed to acquire task refId:%" PRId64 ", may have been destoried", pId->taskId,
*pTaskRefId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
if (p->id.refId != *pTaskRefId) {
stFatal("s-task:%" PRIx64 " inconsistent refId, task refId:%" PRId64 " try acquire:%" PRId64, pId->taskId,
*pTaskRefId, p->id.refId);
int32_t ret = taosReleaseRef(streamTaskRefPool, *pTaskRefId);
if (ret) {
stError("s-task:0x%" PRIx64 " failed to release task refId:%" PRId64, pId->taskId, *pTaskRefId);
}
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
stDebug("s-task:%s acquire task, refId:%" PRId64, p->id.idStr, p->id.refId);
*pTask = p;
return TSDB_CODE_SUCCESS;
}
@ -753,28 +834,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
return code;
}
int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return ref;
}
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
if (pTask == NULL) {
return;
}
int32_t taskId = pTask->id.taskId;
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
// not safe to use the pTask->id.idStr, since pTask may be released by other threads when print logs.
if (ref > 0) {
stTrace("s-task:0x%x release task, ref:%d", taskId, ref);
} else if (ref == 0) {
stTrace("s-task:0x%x all refs are gone, free it", taskId);
tFreeStreamTask(pTask);
} else if (ref < 0) {
stError("task ref is invalid, ref:%d, 0x%x", ref, taskId);
int64_t refId = pTask->id.refId;
stDebug("s-task:0x%x release task, refId:%" PRId64, taskId, pTask->id.refId);
int32_t ret = taosReleaseRef(streamTaskRefPool, pTask->id.refId);
if (ret) {
stError("s-task:0x%x failed to release task refId:%" PRId64, taskId, refId);
}
}
@ -812,13 +882,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
int32_t code = 0;
STaskId id = {.streamId = streamId, .taskId = taskId};
// pre-delete operation
streamMetaWLock(pMeta);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
// desc the paused task counter
if (streamTaskShouldPause(pTask)) {
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
@ -830,43 +897,9 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
if (code) {
stError("s-task:0x%" PRIx64 " failed to handle dropping event async, code:%s", id.taskId, tstrerror(code));
}
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
streamMetaWUnLock(pMeta);
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
while (1) {
int32_t timerActive = 0;
streamMetaRLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
// to make sure check status will not start the check downstream status when we start to check timerActive count.
streamMutexLock(&pTask->taskCheckInfo.checkInfoLock);
timerActive = (*ppTask)->status.timerActive;
streamMutexUnlock(&pTask->taskCheckInfo.checkInfoLock);
}
streamMetaRUnLock(pMeta);
if (timerActive > 0) {
taosMsleep(100);
stDebug("s-task:0x%" PRIx64 " wait for quit from timer", id.taskId);
} else {
break;
}
}
// let's do delete of stream task
streamMetaWLock(pMeta);
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
// it is a fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) {
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
@ -884,21 +917,22 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
if (sizeInList != size) {
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
}
streamMetaWUnLock(pMeta);
int32_t numOfTmr = pTask->status.timerActive;
if (numOfTmr != 0) {
stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr);
}
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
stDebug("s-task:%s stop schedTimer, and (before) desc ref:%d", pTask->id.idStr, pTask->refCnt);
streamTmrStop(pTask->schedInfo.pDelayTimer);
pTask->info.delaySchedParam = 0;
streamMetaReleaseTask(pMeta, pTask);
}
int64_t refId = pTask->id.refId;
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret != 0) {
stError("s-task:0x%x failed to remove ref, refId:%"PRId64, (int32_t) id.taskId, refId);
}
streamMetaReleaseTask(pMeta, pTask);
streamMetaWUnLock(pMeta);
} else {
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
streamMetaWUnLock(pMeta);
@ -1008,13 +1042,13 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
return;
}
vgId = pMeta->vgId;
pRecycleList = taosArrayInit(4, sizeof(STaskId));
if (pRecycleList == NULL) {
stError("vgId:%d failed prepare load all tasks, code:out of memory", vgId);
return;
}
vgId = pMeta->vgId;
stInfo("vgId:%d load stream tasks from meta files", vgId);
code = tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL);
@ -1058,9 +1092,9 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
if (pTask->status.taskStatus == TASK_STATUS__DROPPING) {
int32_t taskId = pTask->id.taskId;
tFreeStreamTask(pTask);
STaskId id = streamTaskGetTaskId(pTask);
tFreeStreamTask(pTask);
void* px = taosArrayPush(pRecycleList, &id);
if (px == NULL) {
stError("s-task:0x%x failed record the task into recycle list due to out of memory", taskId);
@ -1096,13 +1130,22 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES) != 0) {
stError("s-task:0x%x failed to put into hashTable, code:%s, continue", pTask->id.taskId, tstrerror(terrno));
void* px = taosArrayPop(pMeta->pTaskList);
tFreeStreamTask(pTask);
pTask->id.refId = taosAddRef(streamTaskRefPool, pTask);
if (taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)) != 0) {
int64_t refId = pTask->id.refId;
stError("s-task:0x%x failed to put into hashTable, code:%s, remove task ref, refId:%" PRId64 " continue",
pTask->id.taskId, tstrerror(terrno), refId);
void* px = taosArrayPop(pMeta->pTaskList);
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret != 0) {
stError("s-task:0x%x failed to remove ref, refId:%" PRId64, (int32_t)id.taskId, refId);
}
continue;
}
stInfo("s-task:0x%x vgId:%d set refId:%"PRId64, (int32_t) id.taskId, vgId, pTask->id.refId);
if (pTask->info.fillHistory == 0) {
int32_t val = atomic_add_fetch_32(&pMeta->numOfStreamTasks, 1);
}
@ -1138,72 +1181,22 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
}
}
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
streamMetaRLock(pMeta);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) {
stDebug("s-task:%s in timer, blocking tasks in vgId:%d restart, set closing again", pTask->id.idStr, pMeta->vgId);
int32_t code = streamTaskStop(pTask);
if (code) {
stError("s-task:%s failed to stop task, code:%s", pTask->id.idStr, tstrerror(code));
}
inTimer = true;
}
}
streamMetaRUnLock(pMeta);
return inTimer;
}
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int64_t startTs = 0;
int32_t sendCount = 0;
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
streamMetaWaitForHbTmrQuit(pMeta);
streamMetaWLock(pMeta);
pMeta->closeFlag = true;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
int32_t code = streamTaskStop(pTask);
if (code) {
stError("vgId:%d failed to stop task:0x%x, code:%s", vgId, pTask->id.taskId, tstrerror(code));
}
}
streamMetaWUnLock(pMeta);
stDebug("vgId:%d start to check all tasks for closing", vgId);
int64_t st = taosGetTimestampMs();
while (streamMetaTaskInTimer(pMeta)) {
stDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
streamMetaRLock(pMeta);
SArray* pTaskList = NULL;
@ -1211,14 +1204,34 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
if (code != TSDB_CODE_SUCCESS) {
}
streamMetaRUnLock(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
if (pTaskList != NULL) {
taosArrayDestroy(pTaskList);
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
continue;
}
int64_t refId = pTask->id.refId;
int32_t ret = streamTaskStop(pTask);
if (ret) {
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
}
streamMetaReleaseTask(pMeta, pTask);
ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret) {
stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId);
}
}
int64_t el = taosGetTimestampMs() - st;
stDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, numOfTasks, el);
streamMetaRUnLock(pMeta);
}
void streamMetaStartHb(SStreamMeta* pMeta) {
@ -1228,12 +1241,12 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
return;
}
*pRid = pMeta->rid;
int32_t code = metaRefMgtAdd(pMeta->vgId, pRid);
if (code) {
return;
}
*pRid = pMeta->rid;
streamMetaHbToMnode(pRid, NULL);
}
@ -1308,13 +1321,15 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
for (int32_t i = 0; i < num; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL) {
continue;
}
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe((SStreamMeta*)pMeta, &id, &pTask);
if ((*ppTask)->status.downstreamReady == 0) {
return false;
if (code == 0) {
if (pTask->status.downstreamReady == 0) {
streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
return false;
}
streamMetaReleaseTask((SStreamMeta*)pMeta, pTask);
}
}
@ -1331,10 +1346,13 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
streamTaskResetStatus(pTask);
streamMetaReleaseTask(pMeta, pTask);
}
}
return 0;
@ -1343,7 +1361,7 @@ int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
int64_t startTs) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
// keep the already updated info

View File

@ -22,13 +22,13 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int32_t ref = streamMetaAcquireOneTask(pTask);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam);
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTask, streamTimer, &pTask->schedInfo.pDelayTimer,
pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(streamTaskSchedHelper, (int32_t)delaySchema, pTaskRefId, streamTimer,
&pTask->schedInfo.pDelayTimer, pTask->pMeta->vgId, "sched-tmr");
pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE;
}
}
}
@ -75,49 +75,65 @@ void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleT
void streamTaskSetIdleInfo(SStreamTask* pTask, int32_t idleTime) { pTask->status.schedIdleTime = idleTime; }
void streamTaskResumeInFuture(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s task should idle, add into timer to retry in %dms, ref:%d", pTask->id.idStr,
pTask->status.schedIdleTime, ref);
stDebug("s-task:%s task should idle, add into timer to retry in %dms", pTask->id.idStr,
pTask->status.schedIdleTime);
// add one ref count for task
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr");
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTaskRefId, streamTimer,
&pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr");
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void streamTaskResumeHelper(void* param, void* tmrId) {
SStreamTask* pTask = (SStreamTask*)param;
int32_t code = 0;
int64_t taskRefId = *(int64_t*)param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
SStreamTaskId* pId = &pTask->id;
SStreamTaskState p = streamTaskGetStatus(pTask);
int32_t code = 0;
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
int8_t status = streamTaskSetSchedStatusInactive(pTask);
TAOS_UNUSED(status);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref);
stDebug("s-task:%s status:%s not resume task", pId->idStr, p.name);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
return;
}
code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
if (code) {
stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref);
stError("s-task:%s sched task failed, code:%s", pId->idStr, tstrerror(code));
} else {
stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref);
stDebug("trigger to resume s-task:%s after idled for %dms", pId->idStr, pTask->status.schedIdleTime);
// release the task ref count
streamTaskClearSchedIdleInfo(pTask);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
}
void streamTaskSchedHelper(void* param, void* tmrId) {
SStreamTask* pTask = (void*)param;
int64_t taskRefId = *(int64_t*)param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
const char* id = pTask->id.idStr;
int32_t nextTrigger = (int32_t)pTask->info.delaySchedParam;
int32_t vgId = pTask->pMeta->vgId;
@ -127,6 +143,8 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
if (streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) {
stDebug("s-task:%s should stop, jump out of schedTimer", id);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
return;
}
@ -171,6 +189,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) {
}
_end:
streamTmrStart(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
streamTmrStart(streamTaskSchedHelper, nextTrigger, param, streamTimer, &pTask->schedInfo.pDelayTimer, vgId,
"sched-run-tmr");
streamMetaReleaseTask(pTask->pMeta, pTask);
}

View File

@ -15,6 +15,7 @@
#include "streamInt.h"
#include "streamsm.h"
#include "tref.h"
#include "trpc.h"
#include "ttimer.h"
#include "wal.h"
@ -24,7 +25,7 @@
#define SCANHISTORY_IDLE_TICK ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
typedef struct SLaunchHTaskInfo {
SStreamMeta* pMeta;
int64_t metaRid;
STaskId id;
STaskId hTaskId;
} SLaunchHTaskInfo;
@ -89,7 +90,7 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
// add ref for task
SStreamTask* p = NULL;
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
int32_t code = streamMetaAcquireTask(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, &p);
if (p == NULL || code != 0) {
stError("s-task:0x%x failed to acquire task, status:%s, not exec scan-history data", pTask->id.taskId,
streamTaskGetStatus(pTask).name);
@ -98,10 +99,13 @@ void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
pTask->schedHistoryInfo.numOfTicks = numOfTicks;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, vgId, "history-task");
stDebug("s-task:%s scan-history resumed in %.2fs", pTask->id.idStr, numOfTicks * 0.1);
int64_t* pTaskRefId = NULL;
int32_t ret = streamTaskAllocRefId(pTask, &pTaskRefId);
if (ret == 0) {
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer,
&pTask->schedHistoryInfo.pTimer, vgId, "history-task");
}
}
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
@ -220,42 +224,32 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
// Set the execution conditions, including the query time window and the version range
streamMetaRLock(pMeta);
SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id));
SStreamTask* pHisTask = NULL;
code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask);
streamMetaRUnLock(pMeta);
if (pHTask != NULL) { // it is already added into stream meta store.
SStreamTask* pHisTask = NULL;
code = streamMetaAcquireTask(pMeta, hStreamId, hTaskId, &pHisTask);
if (pHisTask == NULL) {
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
if (code == 0) { // it is already added into stream meta store.
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
if (code) {
stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
}
} else {
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
if (code) {
stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
}
} else { // exist, but not ready, continue check downstream task status
if (pHisTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHisTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHisTask, now);
stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
}
}
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHisTask);
} else { // exist, but not ready, continue check downstream task status
if (pHisTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHisTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHisTask, now);
stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
}
}
streamMetaReleaseTask(pMeta, pHisTask);
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHisTask);
}
}
streamMetaReleaseTask(pMeta, pHisTask);
return code;
} else {
return launchNotBuiltFillHistoryTask(pTask);
@ -296,14 +290,14 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
SStreamMeta* pMeta = pTask->pMeta;
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
if (code) {
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
} else {
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x, ref:%d",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId, ref);
stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x",
pTask->id.idStr, MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId);
}
pHTaskInfo->id.taskId = 0;
@ -315,9 +309,9 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
if (streamTaskShouldStop(pTask)) { // record the failure
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64 ", ref:%d", pInfo->id.taskId,
pInfo->hTaskId.taskId, ref);
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
pInfo->hTaskId.taskId);
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
if (code) {
@ -336,30 +330,60 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
}
}
static void doCleanup(SStreamTask* pTask, int64_t metaRid, SLaunchHTaskInfo* pInfo) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
streamMetaReleaseTask(pMeta, pTask);
int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
if (ret) {
stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid);
}
if (pInfo != NULL) {
taosMemoryFree(pInfo);
}
}
void tryLaunchHistoryTask(void* param, void* tmrId) {
SLaunchHTaskInfo* pInfo = param;
SStreamMeta* pMeta = pInfo->pMeta;
int64_t metaRid = pInfo->metaRid;
int64_t now = taosGetTimestampMs();
int32_t code = 0;
SStreamTask* pTask = NULL;
int32_t vgId = 0;
SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, metaRid);
if (pMeta == NULL) {
stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", metaRid);
taosMemoryFree(pInfo);
return;
}
vgId = pMeta->vgId;
streamMetaWLock(pMeta);
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pInfo->id, sizeof(pInfo->id));
if (ppTask == NULL || *ppTask == NULL) {
code = streamMetaAcquireTaskUnsafe(pMeta, &pInfo->id, &pTask);
if (code != 0) {
stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch",
(int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId);
streamMetaWUnLock(pMeta);
int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
if (ret) {
stError("vgId:%d failed to release meta refId:%"PRId64, vgId, metaRid);
}
// already dropped, no need to set the failure info into the stream task meta.
taosMemoryFree(pInfo);
return;
}
if (streamTaskShouldStop(*ppTask)) {
char* p = streamTaskGetStatus(*ppTask).name;
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d, ref:%d",
(*ppTask)->id.idStr, p, (*ppTask)->hTaskInfo.retryTimes, ref);
if (streamTaskShouldStop(pTask)) {
char* p = streamTaskGetStatus(pTask).name;
stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d", pTask->id.idStr, p,
pTask->hTaskInfo.retryTimes);
streamMetaWUnLock(pMeta);
@ -369,77 +393,54 @@ void tryLaunchHistoryTask(void* param, void* tmrId) {
stError("s-task:0x%" PRId64 " failed to record the start task status, code:%s", pInfo->hTaskId.taskId,
tstrerror(code));
}
taosMemoryFree(pInfo);
doCleanup(pTask, metaRid, pInfo);
return;
}
SStreamTask* pTask = NULL;
code = streamMetaAcquireTaskNoLock(pMeta, pInfo->id.streamId, pInfo->id.taskId, &pTask);
if (code != TSDB_CODE_SUCCESS) {
// todo
}
streamMetaWUnLock(pMeta);
if (pTask != NULL) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
streamMetaReleaseTask(pMeta, pTask);
return;
}
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
if (pTask->status.timerActive < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return;
}
// abort the timer if intend to stop task
SStreamTask* pHTask = NULL;
code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask);
if (pHTask == NULL) {
doRetryLaunchFillHistoryTask(pTask, pInfo, now);
streamMetaReleaseTask(pMeta, pTask);
return;
} else {
if (pHTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, now);
stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
}
}
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHTask);
// not in timer anymore
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d, ref:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes, ref);
}
streamMetaReleaseTask(pMeta, pHTask);
}
}
streamMetaReleaseTask(pMeta, pTask);
} else {
code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
if (code) {
stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
}
int32_t ref = atomic_sub_fetch_32(&(*ppTask)->status.timerActive, 1);
stError("s-task:0x%x rel fill-history task:0x%" PRIx64 " may have been destroyed, not launch, ref:%d",
(int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId, ref);
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
pHTaskInfo->tickCount -= 1;
if (pHTaskInfo->tickCount > 0) {
streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
doCleanup(pTask, metaRid, NULL);
return;
}
taosMemoryFree(pInfo);
if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
notRetryLaunchFillHistoryTask(pTask, pInfo, now);
} else { // not reach the limitation yet, let's continue retrying launch related fill-history task.
streamTaskSetRetryInfoForLaunch(pHTaskInfo);
// abort the timer if intend to stop task
SStreamTask* pHTask = NULL;
code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask);
if (pHTask == NULL) {
doRetryLaunchFillHistoryTask(pTask, pInfo, now);
doCleanup(pTask, metaRid, NULL);
return;
} else {
if (pHTask->pBackend == NULL) {
code = pMeta->expandTaskFn(pHTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pHTask, now);
stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
}
}
if (code == TSDB_CODE_SUCCESS) {
checkFillhistoryTaskStatus(pTask, pHTask);
// not in timer anymore
stDebug("s-task:0x%x fill-history task launch completed, retry times:%d", (int32_t)pInfo->id.taskId,
pHTaskInfo->retryTimes);
}
streamMetaReleaseTask(pMeta, pHTask);
}
}
doCleanup(pTask, metaRid, pInfo);
}
int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId,
@ -455,7 +456,7 @@ int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStr
(*pInfo)->hTaskId.streamId = hStreamId;
(*pInfo)->hTaskId.taskId = hTaskId;
(*pInfo)->pMeta = pMeta;
(*pInfo)->metaRid = pMeta->rid;
return TSDB_CODE_SUCCESS;
}
@ -485,12 +486,10 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
// check for the timer
if (pTask->hTaskInfo.pTimer == NULL) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
if (pTask->hTaskInfo.pTimer == NULL) {
ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
stError("s-task:%s failed to start timer, related fill-history task not launched", idStr);
taosMemoryFree(pInfo);
code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
@ -500,18 +499,8 @@ int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
return terrno;
}
if (ref < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
stDebug("s-task:%s set timer active flag, ref:%d", idStr, ref);
stDebug("s-task:%s set timer active flag", idStr);
} else { // timer exists
if (pTask->status.timerActive < 1) {
stError("s-task:%s invalid timerActive recorder:%d, abort timer", pTask->id.idStr, pTask->status.timerActive);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
stDebug("s-task:%s set timer active flag, task timer not null", idStr);
streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
pTask->pMeta->vgId, " start-history-task-tmr");
@ -590,15 +579,22 @@ int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
}
void doExecScanhistoryInFuture(void* param, void* tmrId) {
SStreamTask* pTask = param;
int64_t taskRefId = *(int64_t*) param;
SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
if (pTask == NULL) {
stError("invalid task rid:%" PRId64 " failed to acquired stream-task", taskRefId);
streamTaskFreeRefId(param);
return;
}
pTask->schedHistoryInfo.numOfTicks -= 1;
SStreamTaskState p = streamTaskGetStatus(pTask);
if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p.name, ref);
stDebug("s-task:%s status:%s not start scan-history again", pTask->id.idStr, p.name);
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
return;
}
@ -608,16 +604,19 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) {
stError("s-task:%s async start history task failed", pTask->id.idStr);
}
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr, ref:%d", pTask->id.idStr,
pTask->info.fillHistory, ref);
// release the task.
streamMetaReleaseTask(pTask->pMeta, pTask);
stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr,
pTask->info.fillHistory);
} else {
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
int64_t* pTaskRefId = NULL;
int32_t code = streamTaskAllocRefId(pTask, &pTaskRefId);
if (code == 0) {
streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer,
&pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
}
}
streamMetaReleaseTask(pTask->pMeta, pTask);
streamTaskFreeRefId(param);
}
int32_t doStartScanHistoryTask(SStreamTask* pTask) {

View File

@ -196,19 +196,17 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
bool allRsp = true;
SStreamTask* p = NULL;
streamMetaWLock(pMeta);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
if (code != 0) { // task does not exist in current vnode, not record the complete info
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
// clear the send consensus-checkpointId flag
// streamMutexLock(&(*p)->lock);
// (*p)->status.sendConsensusChkptId = false;
// streamMutexUnlock(&(*p)->lock);
streamMetaReleaseTask(pMeta, p);
if (pStartInfo->startAllTasks != 1) {
int64_t el = endTs - startTs;
@ -222,7 +220,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
if (code == TSDB_CODE_DUP_KEY) {
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
@ -296,13 +294,14 @@ void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
STaskInitTs* pInfo = pIter;
void* key = taosHashGetKey(pIter, &keyLen);
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
if (pTask1 == NULL) {
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
if (code == 0) {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
streamMetaReleaseTask(pMeta, pTask);
} else {
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
}
}
}
@ -417,8 +416,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
streamMetaRLock(pMeta);
SArray* pTaskList = NULL;
int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta);
@ -428,14 +429,12 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks.
SArray* pTaskList = NULL;
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t numOfTasks = taosArrayGetSize(pTaskList);
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = NULL;
@ -445,11 +444,17 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
continue;
}
int64_t refId = pTask->id.refId;
int32_t ret = streamTaskStop(pTask);
if (ret) {
stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
}
streamMetaReleaseTask(pMeta, pTask);
ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret) {
stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId);
}
}
taosArrayDestroy(pTaskList);
@ -466,6 +471,7 @@ int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
int32_t vgId = pTask->pMeta->vgId;
if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
// mark the sending of req consensus checkpoint request.
pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
pConChkptInfo->statusTs = ts;
stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
@ -473,6 +479,8 @@ int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
return 1;
} else {
int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
// not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
pConChkptInfo->statusTs = ts;
@ -492,7 +500,7 @@ void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t
pInfo->status = TASK_CONSEN_CHKPT_RECV;
pInfo->statusTs = ts;
stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
}
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
@ -507,23 +515,24 @@ void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
}
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
int32_t code = TSDB_CODE_SUCCESS;
int64_t now = taosGetTimestampMs();
int64_t startTs = 0;
bool hasFillhistoryTask = false;
STaskId hId = {0};
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask* pTask = NULL;
stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
streamMetaRLock(pMeta);
STaskId id = {.streamId = streamId, .taskId = taskId};
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL) {
startTs = (*ppTask)->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(*ppTask);
hId = (*ppTask)->hTaskInfo.id;
code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
if (code == 0) {
startTs = pTask->taskCheckInfo.startTs;
hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
hId = pTask->hTaskInfo.id;
streamMetaReleaseTask(pMeta, pTask);
streamMetaRUnLock(pMeta);

View File

@ -66,15 +66,9 @@ void streamTmrStop(tmr_h tmrId) {
}
}
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
void streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, void* param) {
pInfo->activeCounter = 0;
pInfo->launchChkptId = 0;
atomic_store_8(&pInfo->isActive, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
if (ref < 0) {
stFatal("invalid task timer ref value:%d, %s", ref, pTask->id.idStr);
}
return ref;
streamTaskFreeRefId(param);
}