Merge pull request #28388 from taosdata/fix/syntax

fix(stream): only keep the latest pause operation status.
This commit is contained in:
Haojun Liao 2024-10-17 09:55:14 +08:00 committed by GitHub
commit 6d3370d611
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 62 additions and 47 deletions

View File

@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta);
int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaAcquireOneTask(SStreamTask* pTask); int32_t streamMetaAcquireOneTask(SStreamTask* pTask);
void streamMetaClear(SStreamMeta* pMeta); void streamMetaClear(SStreamMeta* pMeta);
void streamMetaInitBackend(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);

View File

@ -2625,6 +2625,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
lino = __LINE__; lino = __LINE__;
goto _exit; goto _exit;
} else { // reset the length value
code = TSDB_CODE_SUCCESS;
} }
len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf); len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf);
if (len >= size - 1) goto _exit; if (len >= size - 1) goto _exit;

View File

@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
return 0; return 0;
} }
mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid);
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;

View File

@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa
static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) { if (pReq == NULL) {
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
if (pReq == NULL) { if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(TSDB_CODE_OUT_OF_MEMORY));
// terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno; return terrno;
} }
@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT
bool hasEpset = false; bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { if (code != TSDB_CODE_SUCCESS || (!hasEpset)) {
terrno = code;
taosMemoryFree(pReq); taosMemoryFree(pReq);
return terrno; return code;
} }
code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return terrno; return code;
} }
mDebug("set the resume action for trans:%d", pTrans->id); mDebug("set the resume action for trans:%d", pTrans->id);
return 0; return code;
} }
static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) {

View File

@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if ((ppTask != NULL) && ((*ppTask) != NULL)) { if ((ppTask != NULL) && ((*ppTask) != NULL)) {
streamMetaAcquireOneTask(*ppTask); int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask);
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t code = 0; int32_t code = 0;
if (pTask == NULL) {
return -1;
}
streamTaskResume(pTask); streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask).state; ETaskStatus status = streamTaskGetStatus(pTask).state;
@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} }
} }
streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) { if (code != 0) {
streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
streamMutexUnlock(&pHTask->lock); streamMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
streamMetaReleaseTask(pMeta, pHTask);
} }
return code; return code;

View File

@ -855,6 +855,7 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S
STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList);
if (p == NULL) { if (p == NULL) {
clearBrinBlockIter(&iter); clearBrinBlockIter(&iter);
tsdbError("invalid param, empty in tablescanInfoList, %s", pReader->idStr);
return TSDB_CODE_INVALID_PARA; return TSDB_CODE_INVALID_PARA;
} }
@ -5256,7 +5257,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit // NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data // the data should be ingested in round-robin and all the child tables should be createted before ingesting data
// the version range of query will be used to identify the correctness of suspend/resume functions. // the version range of query will be used to identify the correctness of suspend/resume functions.
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files // this function will be blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
#if SUSPEND_RESUME_TEST #if SUSPEND_RESUME_TEST
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) { if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
tsem_wait(&pReader->resumeAfterSuspend); tsem_wait(&pReader->resumeAfterSuspend);
@ -5909,6 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_
} else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing
} else { } else {
code = TSDB_CODE_INVALID_PARA; code = TSDB_CODE_INVALID_PARA;
tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code));
metaReaderClear(&mr); metaReaderClear(&mr);
return code; return code;
} }

View File

@ -633,47 +633,44 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
} }
break; break;
case TDMT_STREAM_TASK_DEPLOY: { case TDMT_STREAM_TASK_DEPLOY: {
int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_DROP: { case TDMT_STREAM_TASK_DROP: {
if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_UPDATE_CHKPT: { case TDMT_STREAM_TASK_UPDATE_CHKPT: {
if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_CONSEN_CHKPT: { case TDMT_STREAM_CONSEN_CHKPT: {
if (pVnode->restored) { if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) {
if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) {
goto _err; goto _err;
} }
}
} break; } break;
case TDMT_STREAM_TASK_PAUSE: { case TDMT_STREAM_TASK_PAUSE: {
if (pVnode->restored && vnodeIsLeader(pVnode) && if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { (code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_STREAM_TASK_RESUME: { case TDMT_STREAM_TASK_RESUME: {
if (pVnode->restored && vnodeIsLeader(pVnode) && if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { (code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) {
goto _err; goto _err;
} }
} break; } break;
case TDMT_VND_STREAM_TASK_RESET: { case TDMT_VND_STREAM_TASK_RESET: {
if (pVnode->restored && vnodeIsLeader(pVnode)) { if (pVnode->restored && vnodeIsLeader(pVnode) &&
if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) { (code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) {
goto _err; goto _err;
} }
}
} break; } break;
case TDMT_VND_ALTER_CONFIRM: case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange; needCommit = pVnode->config.hashChange;
@ -693,10 +690,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
case TDMT_VND_DROP_INDEX: case TDMT_VND_DROP_INDEX:
vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp);
break; break;
case TDMT_VND_STREAM_CHECK_POINT_SOURCE: case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true
tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp);
break; break;
case TDMT_VND_STREAM_TASK_UPDATE: case TDMT_VND_STREAM_TASK_UPDATE: // always return true
tqProcessTaskUpdateReq(pVnode->pTq, pMsg); tqProcessTaskUpdateReq(pVnode->pTq, pMsg);
break; break;
case TDMT_VND_COMPACT: case TDMT_VND_COMPACT:
@ -752,7 +749,7 @@ _exit:
_err: _err:
vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType),
tstrerror(terrno), ver); tstrerror(code), ver);
return code; return code;
} }

View File

@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
return; return;
} }
/*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here
streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs());
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);

View File

@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (old == 0) { if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"trigger-recv-monitor"); "trigger-recv-monitor");
pTmrInfo->launchChkptId = pActiveInfo->activeId; pTmrInfo->launchChkptId = pActiveInfo->activeId;

View File

@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
if (old == 0) { if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask); int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId,
"chkpt-ready-monitor"); "chkpt-ready-monitor");

View File

@ -753,12 +753,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task
return code; return code;
} }
void streamMetaAcquireOneTask(SStreamTask* pTask) { int32_t streamMetaAcquireOneTask(SStreamTask* pTask) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1);
stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref);
return ref;
} }
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
if (pTask == NULL) {
return;
}
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);
@ -862,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) { if (ppTask) {
pTask = *ppTask; pTask = *ppTask;
// it is an fill-history task, remove the related stream task's id that points to it // it is a fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
} }

View File

@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId);
void streamSetupScheduleTrigger(SStreamTask* pTask) { void streamSetupScheduleTrigger(SStreamTask* pTask) {
int64_t delaySchema = pTask->info.delaySchedParam; int64_t delaySchema = pTask->info.delaySchedParam;
if (delaySchema != 0 && pTask->info.fillHistory == 0) { if (delaySchema != 0 && pTask->info.fillHistory == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); int32_t ref = streamMetaAcquireOneTask(pTask);
stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref,
pTask->info.delaySchedParam); pTask->info.delaySchedParam);
@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
pRunReq->reqType = execType; pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}
return code;
} }
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }
@ -76,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) {
pTask->status.schedIdleTime, ref); pTask->status.schedIdleTime, ref);
// add one ref count for task // add one ref count for task
streamMetaAcquireOneTask(pTask); int32_t unusedRetRef = streamMetaAcquireOneTask(pTask);
streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer,
pTask->pMeta->vgId, "resume-task-tmr"); pTask->pMeta->vgId, "resume-task-tmr");
} }

View File

@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->inputq.queue) { if (pTask->inputq.queue) {
streamQueueClose(pTask->inputq.queue, pTask->id.taskId); streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
pTask->inputq.queue = NULL;
} }
if (pTask->outputq.queue) { if (pTask->outputq.queue) {
streamQueueClose(pTask->outputq.queue, pTask->id.taskId); streamQueueClose(pTask->outputq.queue, pTask->id.taskId);
pTask->outputq.queue = NULL;
} }
if (pTask->exec.qmsg) { if (pTask->exec.qmsg) {
@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
if (pTask->exec.pWalReader != NULL) { if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader); walCloseReader(pTask->exec.pWalReader);
pTask->exec.pWalReader = NULL;
} }
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);

View File

@ -501,9 +501,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
if (pTrans == NULL) { if (pTrans == NULL) {
ETaskStatus s = pSM->current.state; ETaskStatus s = pSM->current.state;
if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT &&
s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { s != TASK_STATUS__READY) {
stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name,
GET_EVT_NAME(pSM->prev.evt));
} }
// the pSM->prev.evt may be 0, so print string is not appropriate. // the pSM->prev.evt may be 0, so print string is not appropriate.
@ -521,11 +522,15 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even
return TSDB_CODE_STREAM_INVALID_STATETRANS; return TSDB_CODE_STREAM_INVALID_STATETRANS;
} }
// repeat pause will not overwrite the previous pause state
if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) {
keepPrevInfo(pSM); keepPrevInfo(pSM);
pSM->current = pTrans->next; pSM->current = pTrans->next;
pSM->pActiveTrans = NULL; } else {
stDebug("s-task:%s repeat pause evt recv, not update prev status", id);
}
pSM->pActiveTrans = NULL;
// todo remove it // todo remove it
// todo: handle the error code // todo: handle the error code
// on success callback, add into lock if necessary, or maybe we should add an option for this? // on success callback, add into lock if necessary, or maybe we should add an option for this?

View File

@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void*
} }
} }
stDebug("vgId:%d start %s tmr succ", vgId, pMsg); stTrace("vgId:%d start %s tmr succ", vgId, pMsg);
} }
void streamTmrStop(tmr_h tmrId) { void streamTmrStop(tmr_h tmrId) {

View File

@ -200,6 +200,7 @@ void* taosArrayPop(SArray* pArray) {
void* taosArrayGet(const SArray* pArray, size_t index) { void* taosArrayGet(const SArray* pArray, size_t index) {
if (NULL == pArray) { if (NULL == pArray) {
terrno = TSDB_CODE_INVALID_PARA; terrno = TSDB_CODE_INVALID_PARA;
uError("failed to return value from array of null ptr");
return NULL; return NULL;
} }