Merge pull request #28393 from taosdata/fix/liaohj
fix(stream): only keep the latest pause operation status.
This commit is contained in:
commit
f7bec4f1bf
|
@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
|
|||
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
|
||||
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||
void streamMetaAcquireOneTask(SStreamTask* pTask);
|
||||
int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
|
||||
void streamMetaClear(SStreamMeta* pMeta);
|
||||
void streamMetaInitBackend(SStreamMeta* pMeta);
|
||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||
|
|
|
@ -2610,7 +2610,10 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
|
|||
if (code < 0) {
|
||||
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
|
||||
goto _exit;
|
||||
} else { // reset the length value
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
|
||||
if (len >= size - 1) goto _exit;
|
||||
} break;
|
||||
|
|
|
@ -1783,7 +1783,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
|||
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SStreamObj *pStream = NULL;
|
||||
int32_t code = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) {
|
||||
return code;
|
||||
|
@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
|
||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||
sdbRelease(pMnode->pSdb, pStream);
|
||||
return -1;
|
||||
|
|
|
@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
|
|||
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
|
||||
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
|
||||
if (pReq == NULL) {
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
|||
if (pReq == NULL) {
|
||||
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
|
||||
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
|
||||
// terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
|
|||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
|
||||
terrno = code;
|
||||
taosMemoryFree(pReq);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(pReq);
|
||||
return terrno;
|
||||
return code;
|
||||
}
|
||||
|
||||
mDebug("set the resume action for trans:%d", pTrans->id);
|
||||
return 0;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {
|
||||
|
|
|
@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
|||
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if ((ppTask != NULL) && ((*ppTask) != NULL)) {
|
||||
streamMetaAcquireOneTask(*ppTask);
|
||||
int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
|
||||
SStreamTask* pTask = *ppTask;
|
||||
|
||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||
|
@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
|||
int32_t vgId = pMeta->vgId;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pTask == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
streamTaskResume(pTask);
|
||||
ETaskStatus status = streamTaskGetStatus(pTask).state;
|
||||
|
||||
|
@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
|
|||
}
|
||||
}
|
||||
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
|
||||
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
|
||||
if (code != 0) {
|
||||
streamMetaReleaseTask(pMeta, pTask);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
|||
streamMutexUnlock(&pHTask->lock);
|
||||
|
||||
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
|
||||
streamMetaReleaseTask(pMeta, pHTask);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -5909,6 +5909,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
|
|||
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
|
||||
} else {
|
||||
code = TSDB_CODE_INVALID_PARA;
|
||||
tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code));
|
||||
metaReaderClear(&mr);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
|||
return;
|
||||
}
|
||||
|
||||
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here
|
||||
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
|
||||
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
|
|
|
@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
|||
if (old == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
|
||||
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"trigger-recv-monitor");
|
||||
pTmrInfo->launchChkptId = pActiveInfo->activeId;
|
||||
|
|
|
@ -1158,7 +1158,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
|||
if (old == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||
|
||||
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
|
||||
"chkpt-ready-monitor");
|
||||
|
|
|
@ -753,12 +753,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
|
|||
return code;
|
||||
}
|
||||
|
||||
void streamMetaAcquireOneTask(SStreamTask* pTask) {
|
||||
int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
|
||||
return ref;
|
||||
}
|
||||
|
||||
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
|
||||
if (pTask == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t taskId = pTask->id.taskId;
|
||||
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
|
||||
|
||||
|
@ -862,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
|||
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask) {
|
||||
pTask = *ppTask;
|
||||
// it is an fill-history task, remove the related stream task's id that points to it
|
||||
// it is a fill-history task, remove the related stream task's id that points to it
|
||||
if (pTask->info.fillHistory == 0) {
|
||||
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
|
||||
}
|
||||
|
|
|
@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
|
|||
void streamSetupScheduleTrigger(SStreamTask* pTask) {
|
||||
int64_t delaySchema = pTask->info.delaySchedParam;
|
||||
if (delaySchema != 0 && pTask->info.fillHistory == 0) {
|
||||
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
|
||||
int32_t ref = streamMetaAcquireOneTask(pTask);
|
||||
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
|
||||
pTask->info.delaySchedParam);
|
||||
|
||||
|
@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
|||
pRunReq->reqType = execType;
|
||||
|
||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||
return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
|
||||
if (code) {
|
||||
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
|
||||
|
@ -76,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
|
|||
pTask->status.schedIdleTime, ref);
|
||||
|
||||
// add one ref count for task
|
||||
streamMetaAcquireOneTask(pTask);
|
||||
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
|
||||
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
|
||||
pTask->pMeta->vgId, "resume-task-tmr");
|
||||
}
|
||||
|
|
|
@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
|
||||
if (pTask->inputq.queue) {
|
||||
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
||||
pTask->inputq.queue = NULL;
|
||||
}
|
||||
|
||||
if (pTask->outputq.queue) {
|
||||
streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
|
||||
pTask->outputq.queue = NULL;
|
||||
}
|
||||
|
||||
if (pTask->exec.qmsg) {
|
||||
|
@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
|||
|
||||
if (pTask->exec.pWalReader != NULL) {
|
||||
walCloseReader(pTask->exec.pWalReader);
|
||||
pTask->exec.pWalReader = NULL;
|
||||
}
|
||||
|
||||
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
|
||||
|
|
|
@ -501,9 +501,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
if (pTrans == NULL) {
|
||||
ETaskStatus s = pSM->current.state;
|
||||
|
||||
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP &&
|
||||
s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) {
|
||||
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt));
|
||||
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
|
||||
s != TASK_STATUS__READY) {
|
||||
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
|
||||
GET_EVT_NAME(pSM->prev.evt));
|
||||
}
|
||||
|
||||
// the pSM->prev.evt may be 0, so print string is not appropriate.
|
||||
|
@ -521,11 +522,15 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
|
|||
return TSDB_CODE_STREAM_INVALID_STATETRANS;
|
||||
}
|
||||
|
||||
keepPrevInfo(pSM);
|
||||
// repeat pause will not overwrite the previous pause state
|
||||
if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
|
||||
keepPrevInfo(pSM);
|
||||
pSM->current = pTrans->next;
|
||||
} else {
|
||||
stDebug("s-task:%s repeat pause evt recv, not update prev status", id);
|
||||
}
|
||||
|
||||
pSM->current = pTrans->next;
|
||||
pSM->pActiveTrans = NULL;
|
||||
|
||||
// todo remove it
|
||||
// todo: handle the error code
|
||||
// on success callback, add into lock if necessary, or maybe we should add an option for this?
|
||||
|
|
|
@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void*
|
|||
}
|
||||
}
|
||||
|
||||
stDebug("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||
stTrace("vgId:%d start %s tmr succ", vgId, pMsg);
|
||||
}
|
||||
|
||||
void streamTmrStop(tmr_h tmrId) {
|
||||
|
|
Loading…
Reference in New Issue