diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e6d750468e..58c1707e1f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -754,7 +754,7 @@ int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -void streamMetaAcquireOneTask(SStreamTask* pTask); +int32_t streamMetaAcquireOneTask(SStreamTask* pTask); void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5372533455..60d1f34cf7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2625,6 +2625,8 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); lino = __LINE__; goto _exit; + } else { // reset the length value + code = TSDB_CODE_SUCCESS; } len += tsnprintf(dumpBuf + len, size - len, " %15s |", pBuf); if (len >= size - 1) goto _exit; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 69d3de25fc..a4327b777f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1783,7 +1783,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SStreamObj *pStream = NULL; - int32_t code = 0; + int32_t code = 0; if ((code = grantCheckExpire(TSDB_GRANT_STREAMS)) < 0) { return code; @@ -1811,6 +1811,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { return 0; } + mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); return -1; diff --git a/source/dnode/mnode/impl/src/mndStreamTransAct.c b/source/dnode/mnode/impl/src/mndStreamTransAct.c index 4e0bf97587..139ea4f147 100644 --- a/source/dnode/mnode/impl/src/mndStreamTransAct.c +++ b/source/dnode/mnode/impl/src/mndStreamTransAct.c @@ -61,7 +61,6 @@ static int32_t doSetPauseAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTa static int32_t doSetDropAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq)); if (pReq == NULL) { - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -93,7 +92,6 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT if (pReq == NULL) { mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq), tstrerror(TSDB_CODE_OUT_OF_MEMORY)); - // terrno = TSDB_CODE_OUT_OF_MEMORY; return terrno; } @@ -106,19 +104,18 @@ static int32_t doSetResumeAction(STrans *pTrans, SMnode *pMnode, SStreamTask *pT bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || (!hasEpset)) { - terrno = code; taosMemoryFree(pReq); - return terrno; + return code; } code = setTransAction(pTrans, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != 0) { taosMemoryFree(pReq); - return terrno; + return code; } mDebug("set the resume action for trans:%d", pTrans->id); - return 0; + return code; } static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask* pTask) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 6b7e857120..a00e92997c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -692,7 +692,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if ((ppTask != NULL) && ((*ppTask) != NULL)) { - streamMetaAcquireOneTask(*ppTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(*ppTask); SStreamTask* pTask = *ppTask; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t int32_t vgId = pMeta->vgId; int32_t code = 0; - if (pTask == NULL) { - return -1; - } - streamTaskResume(pTask); ETaskStatus status = streamTaskGetStatus(pTask).state; @@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t } } - streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); if (code != 0) { + streamMetaReleaseTask(pMeta, pTask); return code; } @@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m streamMutexUnlock(&pHTask->lock); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); + streamMetaReleaseTask(pMeta, pHTask); } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index d4b906fe2a..c4971e27cf 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -855,6 +855,7 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); if (p == NULL) { clearBrinBlockIter(&iter); + tsdbError("invalid param, empty in tablescanInfoList, %s", pReader->idStr); return TSDB_CODE_INVALID_PARA; } @@ -5256,7 +5257,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { // NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit // the data should be ingested in round-robin and all the child tables should be createted before ingesting data // the version range of query will be used to identify the correctness of suspend/resume functions. - // this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files + // this function will be blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files #if SUSPEND_RESUME_TEST if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) { tsem_wait(&pReader->resumeAfterSuspend); @@ -5909,6 +5910,7 @@ int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_ } else if (mr.me.type == TSDB_NORMAL_TABLE) { // do nothing } else { code = TSDB_CODE_INVALID_PARA; + tsdbError("invalid mr.me.type:%d, code:%s", mr.me.type, tstrerror(code)); metaReaderClear(&mr); return code; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 371eaa0774..dd13c975cf 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -633,47 +633,44 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; + if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != TSDB_CODE_SUCCESS) { goto _err; } } break; case TDMT_STREAM_TASK_DROP: { - if (tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if ((code = tqProcessTaskDropReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_UPDATE_CHKPT: { - if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) { + if ((code = tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_CONSEN_CHKPT: { - if (pVnode->restored) { - if (tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg) < 0) { - goto _err; - } + if (pVnode->restored && (code = tqProcessTaskConsenChkptIdReq(pVnode->pTq, pMsg)) < 0) { + goto _err; } + } break; case TDMT_STREAM_TASK_PAUSE: { if (pVnode->restored && vnodeIsLeader(pVnode) && - tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + (code = tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_RESUME: { if (pVnode->restored && vnodeIsLeader(pVnode) && - tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) { + (code = tqProcessTaskResumeReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen)) < 0) { goto _err; } } break; case TDMT_VND_STREAM_TASK_RESET: { - if (pVnode->restored && vnodeIsLeader(pVnode)) { - if (tqProcessTaskResetReq(pVnode->pTq, pMsg) < 0) { + if (pVnode->restored && vnodeIsLeader(pVnode) && + (code = tqProcessTaskResetReq(pVnode->pTq, pMsg)) < 0) { goto _err; } - } + } break; case TDMT_VND_ALTER_CONFIRM: needCommit = pVnode->config.hashChange; @@ -693,10 +690,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg case TDMT_VND_DROP_INDEX: vnodeProcessDropIndexReq(pVnode, ver, pReq, len, pRsp); break; - case TDMT_VND_STREAM_CHECK_POINT_SOURCE: + case TDMT_VND_STREAM_CHECK_POINT_SOURCE: // always return true tqProcessTaskCheckPointSourceReq(pVnode->pTq, pMsg, pRsp); break; - case TDMT_VND_STREAM_TASK_UPDATE: + case TDMT_VND_STREAM_TASK_UPDATE: // always return true tqProcessTaskUpdateReq(pVnode->pTq, pMsg); break; case TDMT_VND_COMPACT: @@ -752,7 +749,7 @@ _exit: _err: vError("vgId:%d, process %s request failed since %s, ver:%" PRId64, TD_VID(pVnode), TMSG_INFO(pMsg->msgType), - tstrerror(terrno), ver); + tstrerror(code), ver); return code; } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 75bcc326b3..c1c54b3c0b 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -299,7 +299,7 @@ void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { return; } - /*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index e44bca123b..be914d9746 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -347,7 +347,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); pTmrInfo->launchChkptId = pActiveInfo->activeId; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 133663ac28..62d60ff664 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1162,7 +1162,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { if (old == 0) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 44c9e76906..7e9b60b61a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -753,12 +753,17 @@ int32_t streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t task return code; } -void streamMetaAcquireOneTask(SStreamTask* pTask) { +int32_t streamMetaAcquireOneTask(SStreamTask* pTask) { int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); stTrace("s-task:%s acquire task, ref:%d", pTask->id.idStr, ref); + return ref; } void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { + if (pTask == NULL) { + return; + } + int32_t taskId = pTask->id.taskId; int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); @@ -862,7 +867,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); if (ppTask) { pTask = *ppTask; - // it is an fill-history task, remove the related stream task's id that points to it + // it is a fill-history task, remove the related stream task's id that points to it if (pTask->info.fillHistory == 0) { int32_t ret = atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 98920e6f70..cdaa603e38 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -22,7 +22,7 @@ static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { int64_t delaySchema = pTask->info.delaySchedParam; if (delaySchema != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + int32_t ref = streamMetaAcquireOneTask(pTask); stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, pTask->info.delaySchedParam); @@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3 pRunReq->reqType = execType; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; - return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); + if (code) { + stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId); + } + return code; } void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } @@ -76,7 +80,7 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { pTask->status.schedIdleTime, ref); // add one ref count for task - streamMetaAcquireOneTask(pTask); + int32_t unusedRetRef = streamMetaAcquireOneTask(pTask); streamTmrStart(streamTaskResumeHelper, pTask->status.schedIdleTime, pTask, streamTimer, &pTask->schedInfo.pIdleTimer, pTask->pMeta->vgId, "resume-task-tmr"); } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 71a2ed3e4a..727701e03e 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -258,10 +258,12 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->inputq.queue) { streamQueueClose(pTask->inputq.queue, pTask->id.taskId); + pTask->inputq.queue = NULL; } if (pTask->outputq.queue) { streamQueueClose(pTask->outputq.queue, pTask->id.taskId); + pTask->outputq.queue = NULL; } if (pTask->exec.qmsg) { @@ -275,6 +277,7 @@ void tFreeStreamTask(SStreamTask* pTask) { if (pTask->exec.pWalReader != NULL) { walCloseReader(pTask->exec.pWalReader); + pTask->exec.pWalReader = NULL; } streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 3501d30be4..c3a2742aa2 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -501,9 +501,10 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even if (pTrans == NULL) { ETaskStatus s = pSM->current.state; - if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && - s != TASK_STATUS__UNINIT && s != TASK_STATUS__READY) { - stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, GET_EVT_NAME(pSM->prev.evt)); + if (s != TASK_STATUS__DROPPING && s != TASK_STATUS__PAUSE && s != TASK_STATUS__STOP && s != TASK_STATUS__UNINIT && + s != TASK_STATUS__READY) { + stError("s-task:%s invalid task status:%s on handling event:%s success", id, pSM->current.name, + GET_EVT_NAME(pSM->prev.evt)); } // the pSM->prev.evt may be 0, so print string is not appropriate. @@ -521,11 +522,15 @@ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent even return TSDB_CODE_STREAM_INVALID_STATETRANS; } - keepPrevInfo(pSM); + // repeat pause will not overwrite the previous pause state + if (pSM->current.state != TASK_STATUS__PAUSE || pTrans->next.state != TASK_STATUS__PAUSE) { + keepPrevInfo(pSM); + pSM->current = pTrans->next; + } else { + stDebug("s-task:%s repeat pause evt recv, not update prev status", id); + } - pSM->current = pTrans->next; pSM->pActiveTrans = NULL; - // todo remove it // todo: handle the error code // on success callback, add into lock if necessary, or maybe we should add an option for this? diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8b77fe7cb1..0da9acfd1d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* } } - stDebug("vgId:%d start %s tmr succ", vgId, pMsg); + stTrace("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) { diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 7989a2468b..b94bb512e2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -200,6 +200,7 @@ void* taosArrayPop(SArray* pArray) { void* taosArrayGet(const SArray* pArray, size_t index) { if (NULL == pArray) { terrno = TSDB_CODE_INVALID_PARA; + uError("failed to return value from array of null ptr"); return NULL; }