diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 0ecb50367b..49a58c1333 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -153,7 +153,6 @@ enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, STREAM_INPUT__MERGED_SUBMIT, - STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index ed56b7e6b2..64cc7bca95 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -221,8 +221,8 @@ int32_t qStreamSourceScanParamForHistoryScanStep2(qTaskInfo_t tinfo, SVersionRan int32_t qStreamRecoverFinish(qTaskInfo_t tinfo); bool qStreamScanhistoryFinished(qTaskInfo_t tinfo); int32_t qStreamInfoResetTimewindowFilter(qTaskInfo_t tinfo); -void resetTaskInfo(qTaskInfo_t tinfo); - +void qResetTaskInfoCode(qTaskInfo_t tinfo); +int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval); int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo); int32_t qStreamOperatorReloadState(qTaskInfo_t tInfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e71a6c4dce..ee8f0450a4 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -309,14 +309,16 @@ typedef struct SDataRange { } SDataRange; typedef struct SSTaskBasicInfo { - int32_t nodeId; // vgroup id or snode id - SEpSet epSet; - SEpSet mnodeEpset; // mnode epset for send heartbeat - int32_t selfChildId; - int32_t totalLevel; - int8_t taskLevel; - int8_t fillHistory; // is fill history task or not - int64_t delaySchedParam; // in msec + int32_t nodeId; // vgroup id or snode id + SEpSet epSet; + SEpSet mnodeEpset; // mnode epset for send heartbeat + int32_t selfChildId; + int32_t trigger; + int8_t taskLevel; + int8_t fillHistory; // is fill history task or not + int64_t delaySchedParam; // in msec + int64_t watermark; // extracted from operators + SInterval interval; } SSTaskBasicInfo; typedef struct SStreamRetrieveReq SStreamRetrieveReq; @@ -541,8 +543,9 @@ typedef struct STaskUpdateEntry { typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param); -int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** pTask); +int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger, + int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, + SStreamTask** pTask); void tFreeStreamTask(SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 4f72b26a5e..490f07d401 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -242,7 +242,7 @@ static int32_t doAddSinkTask(SStreamObj* pStream, SMnode* pMnode, SVgObj* pVgrou SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); SStreamTask* pTask = NULL; - int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, *pTaskList, pStream->conf.fillHistory, + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, pEpset, isFillhistory, 0, 0, *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, &pTask); if (code != 0) { return code; @@ -356,8 +356,9 @@ static int32_t buildSourceTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillh uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, - *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pTask); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__SOURCE, pEpset, isFillhistory, pStream->conf.trigger, + useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory, + pStream->subTableWithoutMd5, pTask); return code; } @@ -395,17 +396,17 @@ static void setHTasksId(SStreamObj* pStream) { } static int32_t doAddSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t skey, - SArray* pVerList, SVgObj* pVgroup, bool isFillhistory, bool useTriggerParam) { + SArray* pVerList, SVgObj* pVgroup, bool isHistoryTask, bool useTriggerParam) { SStreamTask* pTask = NULL; - int32_t code = buildSourceTask(pStream, pEpset, isFillhistory, useTriggerParam, &pTask); + int32_t code = buildSourceTask(pStream, pEpset, isHistoryTask, useTriggerParam, &pTask); if (code != TSDB_CODE_SUCCESS) { return code; } - mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isFillhistory); + mDebug("doAddSourceTask taskId:%s, vgId:%d, isFillHistory:%d", pTask->id.idStr, pVgroup->vgId, isHistoryTask); if (pStream->conf.fillHistory) { - haltInitialTaskStatus(pTask, plan, isFillhistory); + haltInitialTaskStatus(pTask, plan, isHistoryTask); } streamTaskSetDataRange(pTask, skey, pVerList, pVgroup->vgId); @@ -451,10 +452,12 @@ static SSubplan* getAggSubPlan(const SQueryPlan* pPlan, int index) { static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream, SEpSet* pEpset, int64_t nextWindowSkey, SArray* pVerList, bool useTriggerParam) { + void* pIter = NULL; + int32_t code = 0; + SSdb* pSdb = pMnode->pSdb; + addNewTaskList(pStream); - void* pIter = NULL; - SSdb* pSdb = pMnode->pSdb; while (1) { SVgObj* pVgroup; pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void**)&pVgroup); @@ -467,10 +470,9 @@ static int32_t addSourceTask(SMnode* pMnode, SSubplan* plan, SStreamObj* pStream continue; } - int code = - doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); + code = doAddSourceTask(pMnode, plan, pStream, pEpset, nextWindowSkey, pVerList, pVgroup, false, useTriggerParam); if (code != 0) { - mError("create stream task, code:%s", tstrerror(code)); + mError("failed to create stream task, code:%s", tstrerror(code)); // todo drop the added source tasks. sdbRelease(pSdb, pVgroup); @@ -502,9 +504,9 @@ static int32_t buildAggTask(SStreamObj* pStream, SEpSet* pEpset, bool isFillhist uint64_t uid = (isFillhistory) ? pStream->hTaskUid : pStream->uid; SArray** pTaskList = (isFillhistory) ? taosArrayGetLast(pStream->pHTasksList) : taosArrayGetLast(pStream->tasks); - int32_t code = - tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, useTriggerParam ? pStream->conf.triggerParam : 0, - *pTaskList, pStream->conf.fillHistory, pStream->subTableWithoutMd5, pAggTask); + int32_t code = tNewStreamTask(uid, TASK_LEVEL__AGG, pEpset, isFillhistory, pStream->conf.trigger, + useTriggerParam ? pStream->conf.triggerParam : 0, *pTaskList, pStream->conf.fillHistory, + pStream->subTableWithoutMd5, pAggTask); return code; } @@ -678,7 +680,7 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (numOfPlanLevel > 1 || externalTargetDB || multiTarget || pStream->fixedSinkVgId) { // add extra sink hasExtraSink = true; - int32_t code = addSinkTask(pMnode, pStream, pEpset); + code = addSinkTask(pMnode, pStream, pEpset); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -692,7 +694,8 @@ static int32_t doScheduleStream(SStreamObj* pStream, SMnode* pMnode, SQueryPlan* if (terrno != 0) code = terrno; TAOS_RETURN(code); } - code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, numOfPlanLevel == 1); + + code = addSourceTask(pMnode, plan, pStream, pEpset, skey, pVerList, (numOfPlanLevel == 1)); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index cbe631912c..672d7aca3e 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -461,17 +461,16 @@ static int32_t mndBuildStreamObjFromCreateReq(SMnode *pMnode, SStreamObj *pObj, pObj->outputSchema.pSchema = pFullSchema; } - bool hasKey = hasDestPrimaryKey(&pObj->outputSchema); SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = false, .streamQuery = true, - .triggerType = pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY ? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, + .triggerType = (pObj->conf.trigger == STREAM_TRIGGER_MAX_DELAY)? STREAM_TRIGGER_WINDOW_CLOSE : pObj->conf.trigger, .watermark = pObj->conf.watermark, .igExpired = pObj->conf.igExpired, .deleteMark = pObj->deleteMark, .igCheckUpdate = pObj->igCheckUpdate, - .destHasPrimaryKey = hasKey, + .destHasPrimaryKey = hasDestPrimaryKey(&pObj->outputSchema), }; // using ast and param to build physical plan diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 7fc079677b..cf18b9cd82 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -38,7 +38,6 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce streamTaskOpenAllUpstreamInput(pTask); streamTaskResetUpstreamStageInfo(pTask); - (void)streamSetupScheduleTrigger(pTask); SCheckpointInfo *pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c865e3ec6e..8448d305a2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -708,9 +708,10 @@ end: static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessVer) { - STQ* pTq = (STQ*)pTqObj; + STQ* pTq = (STQ*)pTqObj; + int32_t vgId = TD_VID(pTq->pVnode); + SCheckpointInfo* pChkInfo = NULL; - int32_t vgId = TD_VID(pTq->pVnode); tqDebug("s-task:0x%x start to build task", pTask->id.taskId); int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer); @@ -752,9 +753,8 @@ int32_t tqBuildStreamTask(void* pTqObj, SStreamTask* pTask, int64_t nextProcessV } streamTaskResetUpstreamStageInfo(pTask); - streamSetupScheduleTrigger(pTask); - SCheckpointInfo* pChkInfo = &pTask->chkInfo; + pChkInfo = &pTask->chkInfo; tqSetRestoreVersionInfo(pTask); char* p = streamTaskGetStatus(pTask).name; @@ -874,13 +874,14 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { SStreamScanHistoryReq* pReq = (SStreamScanHistoryReq*)pMsg->pCont; SStreamMeta* pMeta = pTq->pStreamMeta; int32_t code = TSDB_CODE_SUCCESS; + SStreamTask* pTask = NULL; + SStreamTask* pStreamTask = NULL; - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask); if (pTask == NULL) { tqError("vgId:%d failed to acquire stream task:0x%x during scan history data, task may have been destroyed", pMeta->vgId, pReq->taskId); - return -1; + return code; } // do recovery step1 @@ -945,11 +946,11 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ETaskStatus s = p.state; if (s == TASK_STATUS__PAUSE) { - tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", pTask->id.idStr, - el, pTask->execInfo.step1El, status); + tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs total:%.2fs, sched-status:%d", id, el, + pTask->execInfo.step1El, status); } else if (s == TASK_STATUS__STOP || s == TASK_STATUS__DROPPING) { - tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", pTask->id.idStr, - p.name, pTask->execInfo.step1El); + tqDebug("s-task:%s status:%p not continue scan-history data, total elapsed time:%.2fs quit", id, p.name, + pTask->execInfo.step1El); } } @@ -966,7 +967,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { } // 1. get the related stream task - SStreamTask* pStreamTask = NULL; code = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId, &pStreamTask); if (pStreamTask == NULL) { tqError("failed to find s-task:0x%" PRIx64 ", it may have been destroyed, drop related fill-history task:%s", @@ -977,15 +977,15 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); streamMetaReleaseTask(pMeta, pTask); - return code; // todo: handle failure + return code; } if (pStreamTask->info.taskLevel != TASK_LEVEL__SOURCE) { tqError("s-task:%s fill-history task related stream task level:%d, unexpected", id, pStreamTask->info.taskLevel); return TSDB_CODE_STREAM_INTERNAL_ERROR; } - code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); + code = streamTaskHandleEventAsync(pStreamTask->status.pSM, TASK_EVENT_HALT, handleStep2Async, pTq); streamMetaReleaseTask(pMeta, pStreamTask); atomic_store_32(&pTask->status.inScanHistorySentinel, 0); diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 6463a59dfb..a6f3a60563 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -259,7 +259,12 @@ bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver) { bool taskReadyForDataFromWal(SStreamTask* pTask) { // non-source or fill-history tasks don't need to response the WAL scan action. - if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { + SSTaskBasicInfo* pInfo = &pTask->info; + if ((pInfo->taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { + return false; + } + + if (pInfo->taskLevel == TASK_LEVEL__SOURCE && pInfo->trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { return false; } @@ -271,7 +276,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { } // fill-history task has entered into the last phase, no need to anything - if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) { + if ((pInfo->fillHistory == 1) && pTask->status.appendTranstateBlock) { // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, pTask->dataRange.range.maxVer); @@ -367,7 +372,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { numOfTasks = taosArrayGetSize(pTaskList); for (int32_t i = 0; i < numOfTasks; ++i) { - STaskId* pTaskId = taosArrayGet(pTaskList, i); + STaskId* pTaskId = taosArrayGet(pTaskList, i); if (pTaskId == NULL) { continue; } @@ -397,9 +402,9 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state != TASK_STATUS__READY) { - tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pState.name); + SStreamTaskState state = streamTaskGetStatus(pTask); + if (state.state != TASK_STATUS__READY) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, state.name); streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pStreamMeta, pTask); continue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 83cbb4d3b9..a11cf770fc 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -83,6 +83,8 @@ int32_t tqExpandStreamTask(SStreamTask* pTask) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } + streamSetupScheduleTrigger(pTask); + double el = (taosGetTimestampMs() - st) / 1000.0; tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el); @@ -590,6 +592,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve int32_t taskId = -1; int64_t streamId = -1; bool added = false; + int32_t size = sizeof(SStreamTask); if (tsDisableStream) { tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId); @@ -599,7 +602,6 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve tqDebug("vgId:%d receive new stream task deploy msg, start to build stream task", vgId); // 1.deserialize msg and build task - int32_t size = sizeof(SStreamTask); SStreamTask* pTask = taosMemoryCalloc(1, size); if (pTask == NULL) { tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, size); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2604e2262f..55c26acba6 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -625,9 +625,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; + if ((code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len)) != 0) { goto _err; } } break; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9c38db1cb1..162a16b78a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -131,7 +131,7 @@ static void clearStreamBlock(SOperatorInfo* pOperator) { } } -void resetTaskInfo(qTaskInfo_t tinfo) { +void qResetTaskInfoCode(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; pTaskInfo->code = 0; clearStreamBlock(pTaskInfo->pRoot); @@ -1085,6 +1085,10 @@ _end: return code; } +int32_t qGetStreamIntervalExecInfo(qTaskInfo_t tinfo, int64_t* pWaterMark, SInterval* pInterval) { + return 0; +} + int32_t qSetStreamOperatorOptionForScanHistory(qTaskInfo_t tinfo) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 350bd35490..3799c5a62b 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -239,7 +239,7 @@ int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32 int64_t checkpointId, SRpcMsg* pMsg); void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock); - +int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger); #ifdef __cplusplus } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index d85435d21c..be8d798a3a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,6 +15,42 @@ #include "streamInt.h" +static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { + *pSubmit = NULL; + + int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit); + if (code) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData)); + if ((*pSubmit)->submits == NULL) { + taosFreeQitem(*pSubmit); + *pSubmit = NULL; + return TSDB_CODE_OUT_OF_MEMORY; + } + + (*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT; + return TSDB_CODE_SUCCESS; +} + +static int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { + void* p = taosArrayPush(pMerged->submits, &pSubmit->submit); + if (p == NULL) { + return terrno; + } + + if (pSubmit->ver > pMerged->ver) { + pMerged->ver = pSubmit->ver; + } + return 0; +} + +static void freeItems(void* param) { + SSDataBlock* pBlock = param; + taosArrayDestroy(pBlock->pDataBlock); +} + int32_t createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg, SStreamDataBlock** pRes) { SStreamDataBlock* pData = NULL; int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen, (void**)&pData); @@ -179,37 +215,6 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { } } -int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { - *pSubmit = NULL; - - int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit); - if (code) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData)); - if ((*pSubmit)->submits == NULL) { - taosFreeQitem(*pSubmit); - *pSubmit = NULL; - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT; - return TSDB_CODE_SUCCESS; -} - -int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { - void* p = taosArrayPush(pMerged->submits, &pSubmit->submit); - if (p == NULL) { - return terrno; - } - - if (pSubmit->ver > pMerged->ver) { - pMerged->ver = pSubmit->ver; - } - return 0; -} - // todo handle memory error int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) { *pRes = NULL; @@ -267,11 +272,6 @@ int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem } } -static void freeItems(void* param) { - SSDataBlock* pBlock = param; - taosArrayDestroy(pBlock->pDataBlock); -} - void streamFreeQitem(SStreamQueueItem* data) { int8_t type = data->type; if (type == STREAM_INPUT__GET_RES) { @@ -306,3 +306,36 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pBlock); } } + +int32_t streamCreateSinkResTrigger(SStreamTrigger** pTrigger, int32_t triggerType, int32_t trigger) { + QRY_PARAM_CHECK(pTrigger); + SStreamTrigger* p = NULL; + + int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&p); + if (code) { + return code; + } + + p->type = STREAM_INPUT__GET_RES; + p->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (p->pBlock == NULL) { + taosFreeQitem(p); + return terrno; + } + + p->pBlock->info.type = STREAM_GET_ALL; + + // let's calculate the previous time window + // todo get the time precision for ts + if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + SInterval interval = {.interval = trigger, .sliding = trigger, .intervalUnit = 'a', .slidingUnit = 'a'}; + int64_t now = taosGetTimestampMs(); + + STimeWindow window = getAlignQueryTimeWindow(&interval, now - trigger); + p->pBlock->info.window = window; + } + + *pTrigger = p; + + return code; +} \ No newline at end of file diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f3279a0f01..5f462f62f9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -119,7 +119,7 @@ void streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* to uint64_t ts = 0; if ((code = qExecTask(pExecutor, &output, &ts)) < 0) { if (code == TSDB_CODE_QRY_IN_EXEC) { - resetTaskInfo(pExecutor); + qResetTaskInfoCode(pExecutor); } stError("unexpected stream execution, s-task:%s since %s", pTask->id.idStr, tstrerror(code)); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0417fb2182..a5cd9bf404 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -750,7 +750,7 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask, POINTER_BYTES); - if (code) { + if (code) { // todo remove it from task list stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); return code; } diff --git a/source/libs/stream/src/streamMsg.c b/source/libs/stream/src/streamMsg.c index 1c512888e7..193daa0cc4 100644 --- a/source/libs/stream/src/streamMsg.c +++ b/source/libs/stream/src/streamMsg.c @@ -613,7 +613,7 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->ver)); TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pTask->id.streamId)); TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->id.taskId)); - TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pTask->info.trigger)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->info.taskLevel)); TAOS_CHECK_EXIT(tEncodeI8(pEncoder, pTask->outputInfo.type)); TAOS_CHECK_EXIT(tEncodeI16(pEncoder, pTask->msgInfo.msgType)); @@ -692,7 +692,7 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pTask->id.streamId)); TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->id.taskId)); - TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.totalLevel)); + TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pTask->info.trigger)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->info.taskLevel)); TAOS_CHECK_EXIT(tDecodeI8(pDecoder, &pTask->outputInfo.type)); TAOS_CHECK_EXIT(tDecodeI16(pDecoder, &pTask->msgInfo.msgType)); diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 2d54547aa2..f5b5524026 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -20,15 +20,40 @@ static void streamTaskResumeHelper(void* param, void* tmrId); static void streamTaskSchedHelper(void* param, void* tmrId); void streamSetupScheduleTrigger(SStreamTask* pTask) { - if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) { - int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); - stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", pTask->id.idStr, ref, - pTask->info.delaySchedParam); - - pTask->schedInfo.pDelayTimer = - taosTmrStart(streamTaskSchedHelper, (int32_t)pTask->info.delaySchedParam, pTask, streamTimer); - pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; + int64_t delay = 0; + int32_t code = 0; + const char* id = pTask->id.idStr; + if (pTask->info.fillHistory == 1) { + return; } + + // dynamic set the trigger & triggerParam for STREAM_TRIGGER_FORCE_WINDOW_CLOSE + if ((pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) && (pTask->info.taskLevel == TASK_LEVEL__SOURCE)) { + int64_t waterMark = 0; + SInterval interval = {0}; + code = qGetStreamIntervalExecInfo(pTask->exec.pExecutor, &waterMark, &interval); + if (code == 0) { + pTask->info.delaySchedParam = interval.sliding; + pTask->info.watermark = waterMark; + pTask->info.interval = interval; + } + + // todo: calculate the correct start delay time for force_window_close + delay = pTask->info.delaySchedParam; + stInfo("s-task:%s extract interval info from executor, wm:%" PRId64 " interval:%" PRId64 " unit:%c sliding:%" PRId64 + " unit:%c ", + id, waterMark, interval.interval, interval.intervalUnit, interval.sliding, interval.slidingUnit); + } + + if (delay == 0) { + return; + } + + int32_t ref = atomic_add_fetch_32(&pTask->refCnt, 1); + stDebug("s-task:%s setup scheduler trigger, ref:%d delay:%" PRId64 " ms", id, ref, pTask->info.delaySchedParam); + + pTask->schedInfo.pDelayTimer = taosTmrStart(streamTaskSchedHelper, (int32_t)delay, pTask, streamTimer); + pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } int32_t streamTrySchedExec(SStreamTask* pTask) { @@ -135,32 +160,22 @@ void streamTaskSchedHelper(void* param, void* tmrId) { if (status == TASK_TRIGGER_STATUS__ACTIVE) { SStreamTrigger* pTrigger; - int32_t code = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM, 0, (void**)&pTrigger); + int32_t code = streamCreateSinkResTrigger(&pTrigger, pTask->info.trigger, pTask->info.delaySchedParam); if (code) { - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", + stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, tstrerror(code), nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); + streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); terrno = code; return; } - pTrigger->type = STREAM_INPUT__GET_RES; - pTrigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); - if (pTrigger->pBlock == NULL) { - taosFreeQitem(pTrigger); - - stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", - nextTrigger); - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); - return; - } - atomic_store_8(&pTask->schedInfo.status, TASK_TRIGGER_STATUS__INACTIVE); - pTrigger->pBlock->info.type = STREAM_GET_ALL; code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, "sched-run-tmr"); + streamTmrReset(streamTaskSchedHelper, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pDelayTimer, vgId, + "sched-run-tmr"); return; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index fb2456e1cd..244c4aaf88 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -103,8 +103,9 @@ static SStreamUpstreamEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) { return pEpInfo; } -int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam, - SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, SStreamTask** p) { +int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int32_t trigger, + int64_t triggerParam, SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5, + SStreamTask** p) { *p = NULL; SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask)); @@ -120,6 +121,7 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool pTask->info.taskLevel = taskLevel; pTask->info.fillHistory = fillHistory; + pTask->info.trigger = trigger; pTask->info.delaySchedParam = triggerParam; pTask->subtableWithoutMd5 = subtableWithoutMd5;